@@ -103,17 +103,16 @@ case class InsertIntoHiveTable(
103103 valueClass : Class [_],
104104 fileSinkConf : FileSinkDesc ,
105105 conf : SerializableWritable [JobConf ],
106- isCompressed : Boolean ,
107106 writerContainer : SparkHiveWriterContainer ) {
108107 assert(valueClass != null , " Output value class not set" )
109108 conf.value.setOutputValueClass(valueClass)
110109
111- assert( fileSinkConf.getTableInfo.getOutputFileFormatClassName != null )
112- // Doesn't work in Scala 2.9 due to what may be a generics bug
113- // TODO: Should we uncomment this for Scala 2.10?
114- // conf.setOutputFormat(outputFormatClass)
115- conf.value.set (
116- " mapred.output.format.class " , fileSinkConf.getTableInfo.getOutputFileFormatClassName )
110+ val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
111+ assert(outputFileFormatClassName != null , " Output format class not set " )
112+ conf.value.set( " mapred.output.format.class " , outputFileFormatClassName)
113+
114+ val isCompressed = conf.value.getBoolean (
115+ ConfVars . COMPRESSRESULT .varname, ConfVars . COMPRESSRESULT .defaultBoolVal )
117116
118117 if (isCompressed) {
119118 // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
@@ -218,28 +217,14 @@ case class InsertIntoHiveTable(
218217 val jobConf = new JobConf (sc.hiveconf)
219218 val jobConfSer = new SerializableWritable (jobConf)
220219
221- val defaultPartName = jobConf.get(
222- ConfVars .DEFAULTPARTITIONNAME .varname, ConfVars .DEFAULTPARTITIONNAME .defaultVal)
223220 val writerContainer = if (numDynamicPartitions > 0 ) {
224- new SparkHiveDynamicPartitionWriterContainer (
225- jobConf,
226- fileSinkConf,
227- partitionColumnNames.takeRight(numDynamicPartitions),
228- defaultPartName)
221+ val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
222+ new SparkHiveDynamicPartitionWriterContainer (jobConf, fileSinkConf, dynamicPartColNames)
229223 } else {
230224 new SparkHiveWriterContainer (jobConf, fileSinkConf)
231225 }
232226
233- val isCompressed = jobConf.getBoolean(
234- ConfVars .COMPRESSRESULT .varname, ConfVars .COMPRESSRESULT .defaultBoolVal)
235-
236- saveAsHiveFile(
237- child.execute(),
238- outputClass,
239- fileSinkConf,
240- jobConfSer,
241- isCompressed,
242- writerContainer)
227+ saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
243228
244229 val outputPath = FileOutputFormat .getOutputPath(jobConf)
245230 // Have to construct the format of dbname.tablename.
0 commit comments