Skip to content

Commit 99271d6

Browse files
authored
Fix test issue
1 parent 365c5bf commit 99271d6

File tree

1 file changed

+106
-78
lines changed

1 file changed

+106
-78
lines changed

sql/hive/src/test/scala/org/apache/spark/sql/hive/CompressionCodecSuite.scala

Lines changed: 106 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
4848
}
4949
}
5050

51-
private val maxRecordNum = 500
51+
private val maxRecordNum = 50
5252

5353
private def getConvertMetastoreConfName(format: String): String = format.toLowerCase match {
5454
case "parquet" => HiveUtils.CONVERT_METASTORE_PARQUET.key
@@ -67,8 +67,8 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
6767

6868
private def normalizeCodecName(format: String, name: String): String = {
6969
format.toLowerCase match {
70-
case "parquet" => ParquetOptions.shortParquetCompressionCodecNames(name).name()
71-
case "orc" => OrcOptions.shortOrcCompressionCodecNames(name)
70+
case "parquet" => ParquetOptions.getParquetCompressionCodecName(name)
71+
case "orc" => OrcOptions.getORCCompressionCodecName(name)
7272
}
7373
}
7474

@@ -80,7 +80,7 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
8080
block <- footer.getParquetMetadata.getBlocks.asScala
8181
column <- block.getColumns.asScala
8282
} yield column.getCodec.name()
83-
case "orc" => new File(path).listFiles().filter{ file =>
83+
case "orc" => new File(path).listFiles().filter { file =>
8484
file.isFile && !file.getName.endsWith(".crc") && file.getName != "_SUCCESS"
8585
}.map { orcFile =>
8686
OrcFileOperator.getFileReader(orcFile.toPath.toString).get.getCompression.toString
@@ -112,8 +112,8 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
112112

113113
private def writeDataToTable(
114114
tableName: String,
115-
partition: Option[String]): Unit = {
116-
val partitionInsert = partition.map(p => s"partition (p='$p')").mkString
115+
partitionValue: Option[String]): Unit = {
116+
val partitionInsert = partitionValue.map(p => s"partition (p='$p')").mkString
117117
sql(
118118
s"""
119119
|INSERT INTO TABLE $tableName
@@ -122,29 +122,69 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
122122
""".stripMargin)
123123
}
124124

125+
private def writeDateToTableUsingCTAS(
126+
rootDir: File,
127+
tableName: String,
128+
partitionValue: Option[String],
129+
format: String,
130+
compressionCodec: Option[String]): Unit = {
131+
val partitionCreate = partitionValue.map(p => s"PARTITIONED BY (p)").mkString
132+
val compressionOption = compressionCodec.map { codec =>
133+
s",'${getHiveCompressPropName(format)}'='$codec'"
134+
}.mkString
135+
val partitionSelect = partitionValue.map(p => s",'$p' AS p").mkString
136+
sql(
137+
s"""
138+
|CREATE TABLE $tableName
139+
|USING $format
140+
|OPTIONS('path'='${rootDir.toURI.toString.stripSuffix("/")}/$tableName' $compressionOption)
141+
|$partitionCreate
142+
|AS SELECT * $partitionSelect FROM table_source
143+
""".stripMargin)
144+
}
145+
146+
private def getPreparedTablePath(
147+
tmpDir: File,
148+
tableName: String,
149+
isPartitioned: Boolean,
150+
format: String,
151+
compressionCodec: Option[String],
152+
usingCTAS: Boolean): String = {
153+
val partitionValue = if (isPartitioned) Some("test") else None
154+
if (usingCTAS) {
155+
writeDateToTableUsingCTAS(tmpDir, tableName, partitionValue, format, compressionCodec)
156+
} else {
157+
createTable(tmpDir, tableName, isPartitioned, format, compressionCodec)
158+
writeDataToTable(tableName, partitionValue)
159+
}
160+
getTablePartitionPath(tmpDir, tableName, partitionValue)
161+
}
162+
125163
private def getTableSize(path: String): Long = {
126164
val dir = new File(path)
127165
val files = dir.listFiles().filter(_.getName.startsWith("part-"))
128166
files.map(_.length()).sum
129167
}
130168

131-
private def getTablePartitionPath(dir: File, tableName: String, partition: Option[String]) = {
132-
val partitionPath = partition.map(p => s"p=$p").mkString
169+
private def getTablePartitionPath(
170+
dir: File,
171+
tableName: String,
172+
partitionValue: Option[String]) = {
173+
val partitionPath = partitionValue.map(p => s"p=$p").mkString
133174
s"${dir.getPath.stripSuffix("/")}/$tableName/$partitionPath"
134175
}
135176

136177
private def getUncompressedDataSizeByFormat(
137-
format: String, isPartitioned: Boolean): Long = {
178+
format: String, isPartitioned: Boolean, usingCTAS: Boolean): Long = {
138179
var totalSize = 0L
139180
val tableName = s"tbl_$format"
140181
val codecName = normalizeCodecName(format, "uncompressed")
141182
withSQLConf(getSparkCompressionConfName(format) -> codecName) {
142183
withTempDir { tmpDir =>
143184
withTable(tableName) {
144-
createTable(tmpDir, tableName, isPartitioned, format, Option(codecName))
145-
val partition = if (isPartitioned) Some("test") else None
146-
writeDataToTable(tableName, partition)
147-
val path = getTablePartitionPath(tmpDir, tableName, partition)
185+
val compressionCodec = Option(codecName)
186+
val path = getPreparedTablePath(
187+
tmpDir, tableName, isPartitioned, format, compressionCodec, usingCTAS)
148188
totalSize = getTableSize(path)
149189
}
150190
}
@@ -156,15 +196,15 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
156196
private def checkCompressionCodecForTable(
157197
format: String,
158198
isPartitioned: Boolean,
159-
compressionCodec: Option[String])
199+
compressionCodec: Option[String],
200+
usingCTAS: Boolean)
160201
(assertion: (String, Long) => Unit): Unit = {
161-
val tableName = s"tbl_$format$isPartitioned"
202+
val tableName =
203+
if (usingCTAS) s"tbl_$format$isPartitioned" else s"tbl_$format${isPartitioned}_CAST"
162204
withTempDir { tmpDir =>
163205
withTable(tableName) {
164-
createTable(tmpDir, tableName, isPartitioned, format, compressionCodec)
165-
val partition = if (isPartitioned) Some("test") else None
166-
writeDataToTable(tableName, partition)
167-
val path = getTablePartitionPath(tmpDir, tableName, partition)
206+
val path = getPreparedTablePath(
207+
tmpDir, tableName, isPartitioned, format, compressionCodec, usingCTAS)
168208
val relCompressionCodecs = getTableCompressionCodec(path, format)
169209
assert(relCompressionCodecs.length == 1)
170210
val tableSize = getTableSize(path)
@@ -177,6 +217,7 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
177217
format: String,
178218
isPartitioned: Boolean,
179219
convertMetastore: Boolean,
220+
usingCTAS: Boolean,
180221
compressionCodecs: List[String],
181222
tableCompressionCodecs: List[String])
182223
(assertionCompressionCodec: (Option[String], String, String, Long) => Unit): Unit = {
@@ -186,9 +227,10 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
186227
withSQLConf(getSparkCompressionConfName(format) -> sessionCompressionCodec) {
187228
// 'tableCompression = null' means no table-level compression
188229
val compression = Option(tableCompression)
189-
checkCompressionCodecForTable(format, isPartitioned, compression) {
190-
case (realCompressionCodec, tableSize) => assertionCompressionCodec(compression,
191-
sessionCompressionCodec, realCompressionCodec, tableSize)
230+
checkCompressionCodecForTable(format, isPartitioned, compression, usingCTAS) {
231+
case (realCompressionCodec, tableSize) =>
232+
assertionCompressionCodec(
233+
compression, sessionCompressionCodec, realCompressionCodec, tableSize)
192234
}
193235
}
194236
}
@@ -203,55 +245,35 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
203245
compressionCodec: String,
204246
isPartitioned: Boolean,
205247
convertMetastore: Boolean,
248+
usingCTAS: Boolean,
206249
tableSize: Long): Boolean = {
207-
format match {
208-
case "parquet" =>
209-
val uncompressedSize = if (!convertMetastore || isPartitioned) {
210-
getUncompressedDataSizeByFormat(format, isPartitioned = true)
211-
} else {
212-
getUncompressedDataSizeByFormat(format, isPartitioned = false)
213-
}
214-
215-
if (compressionCodec == "UNCOMPRESSED") {
216-
tableSize == uncompressedSize
217-
} else {
218-
tableSize != uncompressedSize
219-
}
220-
case "orc" =>
221-
val uncompressedSize = if (!convertMetastore || isPartitioned) {
222-
getUncompressedDataSizeByFormat(format, isPartitioned = true)
223-
} else {
224-
getUncompressedDataSizeByFormat(format, isPartitioned = false)
225-
}
226-
227-
if (compressionCodec == "NONE") {
228-
tableSize == uncompressedSize
229-
} else {
230-
tableSize != uncompressedSize
231-
}
232-
case _ => false
250+
val uncompressedSize = getUncompressedDataSizeByFormat(format, isPartitioned, usingCTAS)
251+
compressionCodec match {
252+
case "UNCOMPRESSED" if format == "parquet" => tableSize == uncompressedSize
253+
case "NONE" if format == "orc" => tableSize == uncompressedSize
254+
case _ => tableSize != uncompressedSize
233255
}
234256
}
235257

236258
def checkForTableWithCompressProp(format: String, compressCodecs: List[String]): Unit = {
237259
Seq(true, false).foreach { isPartitioned =>
238260
Seq(true, false).foreach { convertMetastore =>
239-
checkTableCompressionCodecForCodecs(
240-
format,
241-
isPartitioned,
242-
convertMetastore,
243-
compressionCodecs = compressCodecs,
244-
tableCompressionCodecs = compressCodecs) {
245-
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec, tableSize) =>
246-
// For non-partitioned table and when convertMetastore is false, Expect session-level
247-
// take effect, and in other cases expect table-level take effect
248-
val expectCompressionCodec =
249-
if (convertMetastore && !isPartitioned) sessionCompressionCodec
250-
else tableCompressionCodec.get
251-
252-
assert(expectCompressionCodec == realCompressionCodec)
253-
assert(checkTableSize(format, expectCompressionCodec,
254-
isPartitioned, convertMetastore, tableSize))
261+
Seq(true, false).foreach { usingCTAS =>
262+
checkTableCompressionCodecForCodecs(
263+
format,
264+
isPartitioned,
265+
convertMetastore,
266+
usingCTAS,
267+
compressionCodecs = compressCodecs,
268+
tableCompressionCodecs = compressCodecs) {
269+
case
270+
(tableCompressionCodec, sessionCompressionCodec, realCompressionCodec, tableSize) =>
271+
// After SPARK-22926, table-level will take effect
272+
val expectCompressionCodec = tableCompressionCodec.get
273+
assert(expectCompressionCodec == realCompressionCodec)
274+
assert(checkTableSize(format, expectCompressionCodec,
275+
isPartitioned, convertMetastore, usingCTAS, tableSize))
276+
}
255277
}
256278
}
257279
}
@@ -260,17 +282,21 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
260282
def checkForTableWithoutCompressProp(format: String, compressCodecs: List[String]): Unit = {
261283
Seq(true, false).foreach { isPartitioned =>
262284
Seq(true, false).foreach { convertMetastore =>
263-
checkTableCompressionCodecForCodecs(
264-
format,
265-
isPartitioned,
266-
convertMetastore,
267-
compressionCodecs = compressCodecs,
268-
tableCompressionCodecs = List(null)) {
269-
case (tableCompressionCodec, sessionCompressionCodec, realCompressionCodec, tableSize) =>
270-
// Always expect session-level take effect
271-
assert(sessionCompressionCodec == realCompressionCodec)
272-
assert(checkTableSize(format, sessionCompressionCodec,
273-
isPartitioned, convertMetastore, tableSize))
285+
Seq(true, false).foreach { usingCTAS =>
286+
checkTableCompressionCodecForCodecs(
287+
format,
288+
isPartitioned,
289+
convertMetastore,
290+
usingCTAS,
291+
compressionCodecs = compressCodecs,
292+
tableCompressionCodecs = List(null)) {
293+
case
294+
(tableCompressionCodec, sessionCompressionCodec, realCompressionCodec, tableSize) =>
295+
// Always expect session-level take effect
296+
assert(sessionCompressionCodec == realCompressionCodec)
297+
assert(checkTableSize(format, sessionCompressionCodec,
298+
isPartitioned, convertMetastore, usingCTAS, tableSize))
299+
}
274300
}
275301
}
276302
}
@@ -294,16 +320,18 @@ class CompressionCodecSuite extends TestHiveSingleton with ParquetTest with Befo
294320
createTable(tmpDir, tableName, isPartitioned, format, None)
295321
withTable(tableName) {
296322
compressCodecs.foreach { compressionCodec =>
297-
val partition = if (isPartitioned) Some(compressionCodec) else None
323+
val partitionValue = if (isPartitioned) Some(compressionCodec) else None
298324
withSQLConf(getConvertMetastoreConfName(format) -> convertMetastore.toString,
299325
getSparkCompressionConfName(format) -> compressionCodec
300-
) { writeDataToTable(tableName, partition) }
326+
) { writeDataToTable(tableName, partitionValue) }
301327
}
302328
val tablePath = getTablePartitionPath(tmpDir, tableName, None)
303329
val relCompressionCodecs =
304330
if (isPartitioned) compressCodecs.flatMap { codec =>
305331
getTableCompressionCodec(s"$tablePath/p=$codec", format)
306-
} else getTableCompressionCodec(tablePath, format)
332+
} else {
333+
getTableCompressionCodec(tablePath, format)
334+
}
307335

308336
assert(relCompressionCodecs.distinct.sorted == compressCodecs.sorted)
309337
val recordsNum = sql(s"SELECT * from $tableName").count()

0 commit comments

Comments
 (0)