-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-21786][SQL] The 'spark.sql.parquet.compression.codec' and 'spark.sql.orc.compression.codec' configuration doesn't take effect on hive table writing #20087
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 11 commits
9bbfe6e
48cf108
5dbd3ed
5124f1b
6907a3e
67e40d4
e2526ca
8ae86ee
94ac716
43e041f
ee0c558
e9f705d
d3aa7a0
5244aaf
b96a213
a05e85e
b962488
27c949d
79f7263
a51212b
f51c8fd
1860a43
a7cfd6b
eb99b8a
1f5e354
bcfeef5
cd92913
bc4bef4
2ab4012
84707f0
ea9da61
158f7e6
145820b
5b524cc
f9dcdbc
fd4e304
0a30e93
d1f422c
55afac4
bf85301
3e3e938
7236914
e6449e8
0377755
b66700a
f9e7b0c
285d342
bd1a80a
584cdc2
5b150bc
2337edd
43e7eb5
4b89b44
6cf32e0
365c5bf
99271d6
2b9dfbe
5b5e1df
118f788
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,7 +27,7 @@ import org.apache.spark.sql.internal.SQLConf | |
| /** | ||
| * Options for the Parquet data source. | ||
| */ | ||
| private[parquet] class ParquetOptions( | ||
| class ParquetOptions( | ||
| @transient private val parameters: CaseInsensitiveMap[String], | ||
| @transient private val sqlConf: SQLConf) | ||
| extends Serializable { | ||
|
|
@@ -68,7 +68,7 @@ object ParquetOptions { | |
| val MERGE_SCHEMA = "mergeSchema" | ||
|
|
||
| // The parquet compression short names | ||
| private val shortParquetCompressionCodecNames = Map( | ||
| val shortParquetCompressionCodecNames = Map( | ||
|
||
| "none" -> CompressionCodecName.UNCOMPRESSED, | ||
| "uncompressed" -> CompressionCodecName.UNCOMPRESSED, | ||
| "snappy" -> CompressionCodecName.SNAPPY, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -68,6 +68,10 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand { | |
| .get("mapreduce.output.fileoutputformat.compress.type")) | ||
| } | ||
|
|
||
| // Set compression by priority | ||
| HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) | ||
| .foreach { case (compression, codec) => hadoopConf.set(compression, codec) } | ||
|
||
|
|
||
| val committer = FileCommitProtocol.instantiate( | ||
| sparkSession.sessionState.conf.fileCommitProtocolClass, | ||
| jobId = java.util.UUID.randomUUID().toString, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,317 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.hive | ||
|
|
||
| import java.io.File | ||
|
|
||
| import scala.collection.JavaConverters._ | ||
|
|
||
| import org.scalatest.BeforeAndAfterAll | ||
|
|
||
| import org.apache.hadoop.fs.Path | ||
| import org.apache.orc.OrcConf.COMPRESS | ||
| import org.apache.parquet.hadoop.ParquetOutputFormat | ||
|
|
||
| import org.apache.spark.sql.execution.datasources.orc.OrcOptions | ||
| import org.apache.spark.sql.execution.datasources.parquet.{ParquetOptions, ParquetTest} | ||
| import org.apache.spark.sql.hive.orc.OrcFileOperator | ||
| import org.apache.spark.sql.hive.test.TestHiveSingleton | ||
| import org.apache.spark.sql.internal.SQLConf | ||
|
|
||
| class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with BeforeAndAfterAll { | ||
| import spark.implicits._ | ||
|
|
||
| override def beforeAll(): Unit = { | ||
| super.beforeAll() | ||
| (0 until maxRecordNum).toDF("a").createOrReplaceTempView("table_source") | ||
| } | ||
|
|
||
| override def afterAll(): Unit = { | ||
| try { | ||
| spark.catalog.dropTempView("table_source") | ||
| } finally { | ||
| super.afterAll() | ||
| } | ||
| } | ||
|
|
||
| private val maxRecordNum = 500 | ||
|
||
|
|
||
| private def getConvertMetastoreConfName(format: String): String = format.toLowerCase match { | ||
| case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key | ||
| case "orc" => HiveUtils.CONVERT_METASTORE_ORC.key | ||
| } | ||
|
|
||
| private def getSparkCompressionConfName(format: String): String = format.toLowerCase match { | ||
| case "parquet" => SQLConf.PARQUET_COMPRESSION.key | ||
| case "orc" => SQLConf.ORC_COMPRESSION.key | ||
| } | ||
|
|
||
| private def getHiveCompressPropName(format: String): String = format.toLowerCase match { | ||
| case "parquet" => ParquetOutputFormat.COMPRESSION | ||
| case "orc" => COMPRESS.getAttribute | ||
| } | ||
|
|
||
| private def normalizeCodecName(format: String, name: String): String = { | ||
| format.toLowerCase match { | ||
| case "parquet" => ParquetOptions.shortParquetCompressionCodecNames(name).name() | ||
| case "orc" => OrcOptions.shortOrcCompressionCodecNames(name) | ||
| } | ||
| } | ||
|
|
||
| private def getTableCompressionCodec(path: String, format: String): Seq[String] = { | ||
| val hadoopConf = spark.sessionState.newHadoopConf() | ||
| val codecs = format.toLowerCase match { | ||
| case "parquet" => for { | ||
| footer <- readAllFootersWithoutSummaryFiles(new Path(path), hadoopConf) | ||
| block <- footer.getParquetMetadata.getBlocks.asScala | ||
| column <- block.getColumns.asScala | ||
| } yield column.getCodec.name() | ||
| case "orc" => new File(path).listFiles().filter{ file => | ||
|
||
| file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS" | ||
| }.map { orcFile => | ||
| OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString | ||
| }.toSeq | ||
| } | ||
| codecs.distinct | ||
| } | ||
|
|
||
| private def createTable( | ||
| rootDir: File, | ||
| tableName: String, | ||
| isPartitioned: Boolean, | ||
| format: String, | ||
| compressionCodec: Option[String]): Unit = { | ||
| val tblProperties = compressionCodec match { | ||
| case Some(prop) => s"TBLPROPERTIES('${getHiveCompressPropName(format)}'='$prop')" | ||
| case _ => "" | ||
| } | ||
| val partitionCreate = if (isPartitioned) "PARTITIONED BY (p string)" else "" | ||
| sql( | ||
| s""" | ||
| |CREATE TABLE $tableName(a int) | ||
| |$partitionCreate | ||
| |STORED AS $format | ||
| |LOCATION '${rootDir.toURI.toString.stripSuffix("/")}/$tableName' | ||
| |$tblProperties | ||
| """.stripMargin) | ||
| } | ||
|
|
||
| private def writeDataToTable( | ||
| tableName: String, | ||
| partition: Option[String]): Unit = { | ||
| val partitionInsert = partition.map(p => s"partition ($p)").mkString | ||
| sql( | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is INSERT after CREATE TABLE. We also need to test/fix another common cases, CTAS [CREATE TABLE AS SELECT]
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. CTAS statement is not allowed to create a partitioned table using Hive's file formats. So I use the syntax of However, it seems to be different from non-partitioned hive table when convertMetastore is true.For non-partitioned hive table, session-level will take effect, but for table created by CTAS, table-level takes effect. And if I merge the code of your PR(#20120), they would be consistent, table-level compression will take effect. Should I fix it after your PR closed?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can merge this PR first. Will ping you when my PR is fixed. Thanks! |
||
| s""" | ||
| |INSERT INTO TABLE $tableName | ||
| |$partitionInsert | ||
| |SELECT * FROM table_source | ||
| """.stripMargin) | ||
| } | ||
|
|
||
| private def getTableSize(path: String): Long = { | ||
| val dir = new File(path) | ||
| val files = dir.listFiles().filter(_.getName.startsWith("part-")) | ||
| files.map(_.length()).sum | ||
| } | ||
|
|
||
| private def getUncompressedDataSizeByFormat( | ||
| format: String, isPartitioned: Boolean): Long = { | ||
| var totalSize = 0L | ||
| val tableName = s"tbl_$format" | ||
| val codecName = normalizeCodecName(format, "uncompressed") | ||
| withSQLConf(getSparkCompressionConfName(format) -> codecName) { | ||
| withTempDir { tmpDir => | ||
| withTable(tableName) { | ||
| createTable(tmpDir, tableName, isPartitioned, format, Option(codecName)) | ||
| val partition = if (isPartitioned) Some("p='test'") else None | ||
| writeDataToTable(tableName, partition) | ||
| val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName/${partition.mkString}" | ||
| totalSize = getTableSize(path) | ||
| } | ||
| } | ||
| } | ||
| assert(totalSize > 0L) | ||
| totalSize | ||
| } | ||
|
|
||
| private def checkCompressionCodecForTable( | ||
| format: String, | ||
| isPartitioned: Boolean, | ||
| compressionCodec: Option[String]) | ||
| (assertion: (String, Long) => Unit): Unit = { | ||
| val tableName = s"tbl_$format$isPartitioned" | ||
| withTempDir { tmpDir => | ||
| withTable(tableName) { | ||
| createTable(tmpDir, tableName, isPartitioned, format, compressionCodec) | ||
| val partition = if (isPartitioned) Some("p='test'") else None | ||
| writeDataToTable(tableName, partition) | ||
| val path = s"${tmpDir.getPath.stripSuffix("/")}/$tableName/${partition.mkString}" | ||
| val relCompressionCodecs = getTableCompressionCodec(path, format) | ||
| assert(relCompressionCodecs.length == 1) | ||
| val tableSize = getTableSize(path) | ||
| assertion(relCompressionCodecs.head, tableSize) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private def checkTableCompressionCodecForCodecs( | ||
| format: String, | ||
| isPartitioned: Boolean, | ||
| convertMetastore: Boolean, | ||
| compressionCodecs: List[String], | ||
| tableCompressionCodecs: List[String]) | ||
| (assertionCompressionCodec: (Option[String], String, String, Long) => Unit): Unit = { | ||
| withSQLConf(getConvertMetastoreConfName(format) -> convertMetastore.toString) { | ||
| tableCompressionCodecs.foreach { tableCompression => | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You are setting compressionCodecs and tableCompressionCodecs to the same one. I think we can set it to different ones to know which one is being used.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we test the precedence of compressionCodecs in effect? Here I just want to verify the precedence.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. |
||
| compressionCodecs.foreach { sessionCompressionCodec => | ||
| withSQLConf(getSparkCompressionConfName(format) -> sessionCompressionCodec) { | ||
| // 'tableCompression = null' means no table-level compression | ||
| val compression = Option(tableCompression) | ||
| checkCompressionCodecForTable(format, isPartitioned, compression) { | ||
| case (realCompressionCodec, tableSize) => assertionCompressionCodec(compression, | ||
| sessionCompressionCodec, realCompressionCodec, tableSize) | ||
|
||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // When the amount of data is small, compressed data size may be larger than uncompressed one, | ||
| // so we just check the difference when compressionCodec is not NONE or UNCOMPRESSED. | ||
| private def checkTableSize( | ||
| format: String, | ||
| compressionCodec: String, | ||
| isPartitioned: Boolean, | ||
| convertMetastore: Boolean, | ||
| tableSize: Long): Boolean = { | ||
| format match { | ||
| case "parquet" => | ||
| val uncompressedSize = if (!convertMetastore || isPartitioned) { | ||
| getUncompressedDataSizeByFormat(format, isPartitioned = true) | ||
| } else { | ||
| getUncompressedDataSizeByFormat(format, isPartitioned = false) | ||
| } | ||
|
|
||
| if (compressionCodec == "UNCOMPRESSED") { | ||
| tableSize == uncompressedSize | ||
| } else { | ||
| tableSize != uncompressedSize | ||
| } | ||
| case "orc" => | ||
| val uncompressedSize = if (!convertMetastore || isPartitioned) { | ||
| getUncompressedDataSizeByFormat(format, isPartitioned = true) | ||
| } else { | ||
| getUncompressedDataSizeByFormat(format, isPartitioned = false) | ||
| } | ||
|
|
||
| if (compressionCodec == "NONE") { | ||
| tableSize == uncompressedSize | ||
| } else { | ||
| tableSize != uncompressedSize | ||
| } | ||
| case _ => false | ||
| } | ||
| } | ||
|
|
||
| def checkForTableWithCompressProp(format: String, compressCodecs: List[String]): Unit = { | ||
| Seq(true, false).foreach { isPartitioned => | ||
| Seq(true, false).foreach { convertMetastore => | ||
| checkTableCompressionCodecForCodecs( | ||
| format, | ||
| isPartitioned, | ||
| convertMetastore, | ||
| compressionCodecs = compressCodecs, | ||
| tableCompressionCodecs = compressCodecs) { | ||
|
||
| case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec, tableSize) => | ||
| // For non-partitioned table and when convertMetastore is false, Expect session-level | ||
| // take effect, and in other cases expect table-level take effect | ||
| val expectCompressionCodec = | ||
| if (convertMetastore && !isPartitioned) sessionCompressionCodec | ||
| else tableCompressionCodec.get | ||
|
|
||
| assert(expectCompressionCodec == realCompressionCodec) | ||
| assert(checkTableSize(format, expectCompressionCodec, | ||
| isPartitioned, convertMetastore, tableSize)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| def checkForTableWithoutCompressProp(format: String, compressCodecs: List[String]): Unit = { | ||
| Seq(true, false).foreach { isPartitioned => | ||
| Seq(true, false).foreach { convertMetastore => | ||
| checkTableCompressionCodecForCodecs( | ||
| format, | ||
| isPartitioned, | ||
| convertMetastore, | ||
| compressionCodecs = compressCodecs, | ||
| tableCompressionCodecs = List(null)) { | ||
|
||
| case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec, tableSize) => | ||
| // Always expect session-level take effect | ||
| assert(sessionCompressionCodec == realCompressionCodec) | ||
| assert(checkTableSize(format, sessionCompressionCodec, | ||
| isPartitioned, convertMetastore, tableSize)) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("both table-level and session-level compression are set") { | ||
| checkForTableWithCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) | ||
| checkForTableWithCompressProp("orc", List("NONE", "SNAPPY", "ZLIB")) | ||
| } | ||
|
|
||
| test("table-level compression is not set but session-level compressions is set ") { | ||
| checkForTableWithoutCompressProp("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) | ||
| checkForTableWithoutCompressProp("orc", List("NONE", "SNAPPY", "ZLIB")) | ||
| } | ||
|
|
||
| def checkTableWriteWithCompressionCodecs(format: String, compressCodecs: List[String]): Unit = { | ||
| Seq(true, false).foreach { isPartitioned => | ||
| Seq(true, false).foreach { convertMetastore => | ||
| withTempDir { tmpDir => | ||
| val tableName = s"tbl_$format$isPartitioned" | ||
| createTable(tmpDir, tableName, isPartitioned, format, None) | ||
| withTable(tableName) { | ||
| compressCodecs.foreach { compressionCodec => | ||
| val partition = if (isPartitioned) Some(s"p='$compressionCodec'") else None | ||
| withSQLConf(getConvertMetastoreConfName(format) -> convertMetastore.toString, | ||
| getSparkCompressionConfName(format) -> compressionCodec | ||
| ) { writeDataToTable(tableName, partition) } | ||
| } | ||
| val tablePath = s"${tmpDir.getPath.stripSuffix("/")}/$tableName" | ||
| val relCompressionCodecs = | ||
| if (isPartitioned) compressCodecs.flatMap { codec => | ||
| getTableCompressionCodec(s"$tablePath/p=$codec", format) | ||
| } else getTableCompressionCodec(tablePath, format) | ||
|
||
|
|
||
| assert(relCompressionCodecs.distinct.sorted == compressCodecs.sorted) | ||
| val recordsNum = sql(s"SELECT * from $tableName").count() | ||
| assert(recordsNum == maxRecordNum * compressCodecs.length) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| test("test table containing mixed compression codec") { | ||
| checkTableWriteWithCompressionCodecs("parquet", List("UNCOMPRESSED", "SNAPPY", "GZIP")) | ||
| checkTableWriteWithCompressionCodecs("orc", List("NONE", "SNAPPY", "ZLIB")) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of changing the access modifiers, add a public function