@@ -102,6 +102,11 @@ case class InsertIntoHiveTable(
102102 obj
103103 }
104104
105+ /**
106+ * since we should get directory of dynamic partition from upstream RDD
107+ * reference the code "serializer.serialize(outputData, standardOI) -> dynamicPartPath"
108+ * So The type of the elment in RDD is (Writable, String)
109+ */
105110 def saveAsHiveFile (
106111 rdd : RDD [(Writable , String )],
107112 valueClass : Class [_],
@@ -142,96 +147,94 @@ case class InsertIntoHiveTable(
142147 if (dynamicPartNum == 0 ) {
143148 writer = new SparkHiveHadoopWriter (conf.value, fileSinkConf)
144149 writer.preSetup()
150+ sc.sparkContext.runJob(rdd, writeToFile _)
151+ writer.commitJob()
145152 } else {
146153 writerMap = new scala.collection.mutable.HashMap [String , SparkHiveHadoopWriter ]
154+ sc.sparkContext.runJob(rdd, writeToFile _)
155+ for ((k,v) <- writerMap) {
156+ v.commitJob()
157+ }
158+ writerMap.clear()
147159 }
148160
149161 def writeToFile (context : TaskContext , iter : Iterator [(Writable , String )]) {
150- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
151- // around by taking a mod. We expect that no task will be attempted 2 billion times.
152- val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
153- // writer for No Dynamic Partition
154- if (dynamicPartNum == 0 ) {
155- writer.setup(context.stageId, context.partitionId, attemptNumber)
156- writer.open()
157- }
162+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
163+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
164+ val attemptNumber = (context.attemptId % Int .MaxValue ).toInt
158165
159- var count = 0
160- // writer for Dynamic Partition
161- var writer2 : SparkHiveHadoopWriter = null
162- while (iter.hasNext) {
163- val record = iter.next()
164- count += 1
165- if (record._2 == null ) { // without Dynamic Partition
166- writer.write(record._1)
167- } else { // for Dynamic Partition
168- val location = fileSinkConf.getDirName
169- val partLocation = location + record._2 // this is why the writer can write to different file
170- writer2 = writerMap.get(record._2) match {
171- case Some (writer)=> writer
172- case None => {
173- val tempWriter = new SparkHiveHadoopWriter (conf.value,
174- new FileSinkDesc (partLocation, fileSinkConf.getTableInfo, false ))
175- tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
176- tempWriter.open(record._2)
177- writerMap += (record._2 -> tempWriter)
178- tempWriter
179- }
180- }
181- writer2.write(record._1)
182- }
166+ if (dynamicPartNum == 0 ) { // for All static partition
167+ writer.setup(context.stageId, context.partitionId, attemptNumber)
168+ writer.open()
169+ // writer for Dynamic Partition
170+ while (iter.hasNext) {
171+ val record = iter.next()
172+ writer.write(record._1)
183173 }
184- if (dynamicPartNum == 0 ) {
185174 writer.close()
186175 writer.commit()
187- } else {
176+ } else { // if there is dynamic Partition
177+ while (iter.hasNext) {
178+ val record = iter.next()
179+ val location = fileSinkConf.getDirName
180+ val partLocation = location + record._2 // different writer related with different file
181+ def createNewWriter (): SparkHiveHadoopWriter = {
182+ val tempWriter = new SparkHiveHadoopWriter (conf.value,
183+ new FileSinkDesc (partLocation, fileSinkConf.getTableInfo, false ))
184+ tempWriter.setup(context.stageId, context.partitionId, attemptNumber)
185+ tempWriter.open(record._2)
186+ writerMap += (record._2 -> tempWriter)
187+ tempWriter
188+ }
189+ val writer2 = writerMap.getOrElseUpdate(record._2, createNewWriter)
190+ writer2.write(record._1)
191+ }
188192 for ((k,v) <- writerMap) {
189193 v.close()
190194 v.commit()
191195 }
192196 }
193- }
194-
195- sc.sparkContext.runJob(rdd, writeToFile _)
196- if (dynamicPartNum == 0 ) {
197- writer.commitJob()
198- } else {
199- for ((k,v) <- writerMap) {
200- v.commitJob()
201- }
202- writerMap.clear()
203197 }
204198 }
205- /*
206- * e.g.
207- * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ...
208- * return: /part1=val1/part2=val2
209- * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ...
210- * return: /part2=val2
211- * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
212- * return: /part2=val2/part3=val3
213- */
199+
200+ /**
201+ * Returns the Dynamic partition directory for the given row.
202+ * @param partCols an array containing the string names of the partition columns
203+ *
204+ * we get the last dynamicPartNum elements from partCols and
205+ * last dynamicPartNum elements from the current row,
206+ * then we can construct a String for dynamic partition directory
207+ * For example:
208+ * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ...
209+ * return: /part1=val1/part2=val2
210+ * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ...
211+ * return: /part2=val2
212+ * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ...
213+ * return: /part2=val2/part3=val3
214+ */
214215 private def getDynamicPartDir (partCols : Array [String ],
215216 row : Row ,
216217 dynamicPartNum : Int ,
217218 defaultPartName : String ): String = {
218219 assert(dynamicPartNum > 0 )
220+ // TODO needs optimization
219221 partCols
220222 .takeRight(dynamicPartNum)
221223 .zip(row.takeRight(dynamicPartNum))
222224 .map { case (c, v) => s " / $c= ${handleNull(v, defaultPartName)}" }
223225 .mkString
224226 }
225227
226- /*
227- * if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default
228- * */
229- private def handleNull (rowVal : Any , defaultPartName : String ): String = {
230- if (rowVal == null || String .valueOf(rowVal).length == 0 ) {
231- defaultPartName
232- } else {
233- String .valueOf(rowVal)
234- }
228+ /**
229+ * Returns `rowVal` as a String.
230+ * If `rowVal` is null or equal to "", returns the default partition name.
231+ */
232+ private def handleNull (rowVal : Any , defaultPartName : String ): String = {
233+ if (rowVal == null || String .valueOf(rowVal).length == 0 ) {
234+ defaultPartName
235+ } else {
236+ String .valueOf(rowVal)
237+ }
235238 }
236239
237240 override def execute () = result
@@ -253,36 +256,36 @@ case class InsertIntoHiveTable(
253256 val tableLocation = table.hiveQlTable.getDataLocation
254257 val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
255258 val fileSinkConf = new FileSinkDesc (tmpLocation.toString, tableDesc, false )
256- var tmpDynamicPartNum = 0
257- var numStaPart = 0
259+
260+ val numDynamicPartitions = partition.values.filter(_.isEmpty).size
261+ val numStaticPartitions = partition.values.filter(_.isDefined).size
258262 val partitionSpec = partition.map {
259263 case (key, Some (value)) =>
260- numStaPart += 1
261264 key -> value
262265 case (key, None ) =>
263- tmpDynamicPartNum += 1
264266 key -> " "
265267 }
266- val dynamicPartNum = tmpDynamicPartNum
268+
267269 val jobConf = new JobConf (sc.hiveconf)
268270 val jobConfSer = new SerializableWritable (jobConf)
269271 // check if the partition spec is valid
270- if (dynamicPartNum > 0 ) {
272+ if (numDynamicPartitions > 0 ) {
271273 if (! sc.hiveconf.getBoolVar(HiveConf .ConfVars .DYNAMICPARTITIONING )) {
272274 throw new SparkException (ErrorMsg .DYNAMIC_PARTITION_DISABLED .getMsg())
273275 }
274- if (numStaPart == 0 &&
276+ if (numStaticPartitions == 0 &&
275277 sc.hiveconf.getVar(HiveConf .ConfVars .DYNAMICPARTITIONINGMODE ).equalsIgnoreCase(" strict" )) {
276278 throw new SparkException (ErrorMsg .DYNAMIC_PARTITION_STRICT_MODE .getMsg())
277279 }
278280 // check if static partition appear after dynamic partitions
281+ var tmpNumStaticPartitions = numStaticPartitions
279282 for ((k,v) <- partitionSpec) {
280283 if (partitionSpec(k) == " " ) {
281- if (numStaPart > 0 ) { // found a DP, but there exists ST as subpartition
284+ if (tmpNumStaticPartitions > 0 ) { // found a DP, but there exists ST as subpartition
282285 throw new SparkException (ErrorMsg .PARTITION_DYN_STA_ORDER .getMsg())
283286 }
284287 } else {
285- numStaPart -= 1
288+ tmpNumStaticPartitions -= 1
286289 }
287290 }
288291 }
@@ -299,27 +302,24 @@ case class InsertIntoHiveTable(
299302 val outputData = new Array [Any ](fieldOIs.length)
300303 val defaultPartName = jobConfSer.value.get(
301304 " hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
302- var partColStr : Array [String ] = null ;
303- if (fileSinkConf.getTableInfo.getProperties.getProperty(" partition_columns" ) != null ) {
304- partColStr = fileSinkConf
305- .getTableInfo
306- .getProperties
307- .getProperty(" partition_columns" )
308- .split(" /" )
309- }
305+
306+ val partitionColumns = fileSinkConf.getTableInfo.
307+ getProperties.getProperty(" partition_columns" ) // a String like "colname1/colname2"
308+ val partitionColumnNames = Option (partitionColumns).map(_.split(" /" )).orNull
310309
311310 iter.map { row =>
312311 var dynamicPartPath : String = null
313- if (dynamicPartNum > 0 ) {
314- dynamicPartPath = getDynamicPartDir(partColStr, row, dynamicPartNum, defaultPartName)
312+ if (numDynamicPartitions > 0 ) {
313+ dynamicPartPath = getDynamicPartDir(partitionColumnNames, row,
314+ numDynamicPartitions, defaultPartName)
315315 }
316316 var i = 0
317317 while (i < fieldOIs.length) {
318318 // Casts Strings to HiveVarchars when necessary.
319319 outputData(i) = wrap(row(i), fieldOIs(i))
320320 i += 1
321321 }
322-
322+ // pass the dynamicPartPath to downStream RDD
323323 serializer.serialize(outputData, standardOI) -> dynamicPartPath
324324 }
325325 }
@@ -329,7 +329,7 @@ case class InsertIntoHiveTable(
329329 fileSinkConf,
330330 jobConfSer,
331331 sc.hiveconf.getBoolean(" hive.exec.compress.output" , false ),
332- dynamicPartNum )
332+ numDynamicPartitions )
333333
334334 val outputPath = FileOutputFormat .getOutputPath(jobConf)
335335 // Have to construct the format of dbname.tablename.
@@ -346,13 +346,13 @@ case class InsertIntoHiveTable(
346346 val inheritTableSpecs = true
347347 // TODO: Correctly set isSkewedStoreAsSubdir.
348348 val isSkewedStoreAsSubdir = false
349- if (dynamicPartNum > 0 ) {
349+ if (numDynamicPartitions > 0 ) {
350350 db.loadDynamicPartitions(
351351 outputPath,
352352 qualifiedTableName,
353353 partitionSpec,
354354 overwrite,
355- dynamicPartNum ,
355+ numDynamicPartitions ,
356356 holdDDLTime,
357357 isSkewedStoreAsSubdir
358358 )
0 commit comments