Skip to content

Commit a132c80

Browse files
committed
Fixes output compression
1 parent 9c6eb2d commit a132c80

File tree

2 files changed

+14
-15
lines changed

2 files changed

+14
-15
lines changed

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,6 @@ case class InsertIntoHiveTable(
110110
val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
111111
assert(outputFileFormatClassName != null, "Output format class not set")
112112
conf.value.set("mapred.output.format.class", outputFileFormatClassName)
113-
114-
val isCompressed = conf.value.getBoolean(
115-
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
116-
117-
if (isCompressed) {
118-
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
119-
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
120-
// to store compression information.
121-
conf.value.set("mapred.output.compress", "true")
122-
fileSinkConf.setCompressed(true)
123-
fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec"))
124-
fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type"))
125-
}
126113
conf.value.setOutputCommitter(classOf[FileOutputCommitter])
127114

128115
FileOutputFormat.setOutputPath(
@@ -181,6 +168,18 @@ case class InsertIntoHiveTable(
181168
val tableLocation = table.hiveQlTable.getDataLocation
182169
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
183170
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
171+
val isCompressed = sc.hiveconf.getBoolean(
172+
ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
173+
174+
if (isCompressed) {
175+
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
176+
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
177+
// to store compression information.
178+
sc.hiveconf.set("mapred.output.compress", "true")
179+
fileSinkConf.setCompressed(true)
180+
fileSinkConf.setCompressCodec(sc.hiveconf.get("mapred.output.compression.codec"))
181+
fileSinkConf.setCompressType(sc.hiveconf.get("mapred.output.compression.type"))
182+
}
184183

185184
val numDynamicPartitions = partition.values.count(_.isEmpty)
186185
val numStaticPartitions = partition.values.count(_.nonEmpty)

sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,8 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer(
183183
val dynamicPartPath = dynamicPartColNames
184184
.zip(row.takeRight(dynamicPartColNames.length))
185185
.map { case (col, rawVal) =>
186-
val string = String.valueOf(rawVal)
187-
s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}"
186+
val string = if (rawVal == null) null else String.valueOf(rawVal)
187+
s"/$col=${if (string == null || string.isEmpty) defaultPartName else string}"
188188
}
189189
.mkString
190190

0 commit comments

Comments
 (0)