@@ -24,7 +24,9 @@ import java.util.{HashMap => JHashMap}
2424import org .apache .hadoop .hive .common .`type` .{HiveDecimal , HiveVarchar }
2525import org .apache .hadoop .hive .metastore .MetaStoreUtils
2626import org .apache .hadoop .hive .ql .Context
27+ import org .apache .hadoop .hive .ql .ErrorMsg
2728import org .apache .hadoop .hive .ql .metadata .Hive
29+ import org .apache .hadoop .hive .ql .parse .SemanticException
2830import org .apache .hadoop .hive .ql .plan .{FileSinkDesc , TableDesc }
2931import org .apache .hadoop .hive .serde2 .Serializer
3032import org .apache .hadoop .hive .serde2 .objectinspector ._
@@ -40,6 +42,7 @@ import org.apache.spark.rdd.RDD
4042import org .apache .spark .sql .catalyst .expressions .Row
4143import org .apache .spark .sql .execution .{SparkPlan , UnaryNode }
4244import org .apache .spark .sql .hive .{HiveContext , MetastoreRelation , SparkHiveHadoopWriter }
45+ import org .apache .hadoop .hive .conf .HiveConf
4346
4447/**
4548 * :: DeveloperApi ::
@@ -159,7 +162,7 @@ case class InsertIntoHiveTable(
159162 writer.commitJob()
160163 }
161164
162- def getDynamicPartDir (tableInfo : TableDesc , row : Row , dynamicPartNum2 : Int ) : String = {
165+ def getDynamicPartDir (tableInfo : TableDesc , row : Row , dynamicPartNum2 : Int , jobConf : JobConf ) : String = {
163166 dynamicPartNum2 match {
164167 case 0 => " "
165168 case i => {
@@ -169,18 +172,26 @@ case class InsertIntoHiveTable(
169172 var buf = new StringBuffer ()
170173 if (partCols.length == dynamicPartNum2) {
171174 for (j <- 0 until partCols.length) {
172- buf.append(" /" ).append(partCols(j)).append(" =" ).append(row(j + row.length - colsNum ))
175+ buf.append(" /" ).append(partCols(j)).append(" =" ).append(handleNull( row(colsNum + j ), jobConf ))
173176 }
174177 } else {
175178 for (j <- 0 until dynamicPartNum2) {
176- buf.append(" /" ).append(partCols(j + partCols.length - dynamicPartNum2)).append(" =" ).append(row(j + colsNum ))
179+ buf.append(" /" ).append(partCols(j + partCols.length - dynamicPartNum2)).append(" =" ).append(handleNull( row(colsNum + j), jobConf ))
177180 }
178181 }
179182 buf.toString
180183 }
181184 }
182185 }
183186
187+ def handleNull (obj : Any , jobConf : JobConf ) : String = {
188+ if (obj == null || obj.toString.length == 0 ) {
189+ jobConf.get(" hive.exec.default.partition.name " , " __HIVE_DEFAULT_PARTITION__" )
190+ } else {
191+ obj.toString
192+ }
193+ }
194+
184195 override def execute () = result
185196
186197 /**
@@ -201,11 +212,38 @@ case class InsertIntoHiveTable(
201212 val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
202213 val fileSinkConf = new FileSinkDesc (tmpLocation.toString, tableDesc, false )
203214 var dynamicPartNum = 0
215+ var numStaPart = 0
204216 var dynamicPartPath = " " ;
205217 val partitionSpec = partition.map {
206- case (key, Some (value)) => key -> value
207- case (key, None ) => { dynamicPartNum += 1 ; key -> " " }// Should not reach here right now.
218+ case (key, Some (value)) => { numStaPart += 1 ; key -> value }
219+ case (key, None ) => { dynamicPartNum += 1 ; key -> " " }
208220 }
221+ // ORC stores compression information in table properties. While, there are other formats
222+ // (e.g. RCFile) that rely on hadoop configurations to store compression information.
223+ val jobConf = new JobConf (sc.hiveconf)
224+ val jobConfSer = new SerializableWritable (jobConf)
225+ // check if the partition spec is valid
226+ if (dynamicPartNum > 0 ) {
227+ if (! sc.hiveconf.getBoolVar(HiveConf .ConfVars .DYNAMICPARTITIONING )) {
228+ throw new SemanticException (
229+ ErrorMsg .DYNAMIC_PARTITION_DISABLED .getMsg())
230+ }
231+ if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf .ConfVars .DYNAMICPARTITIONINGMODE ).equalsIgnoreCase(" strict" )) {
232+ throw new SemanticException (ErrorMsg .DYNAMIC_PARTITION_STRICT_MODE .getMsg());
233+ }
234+ // check if static partition appear after dynamic partitions
235+ for ((k,v) <- partitionSpec) {
236+ if (partitionSpec(k) == " " ) {
237+ if (numStaPart > 0 ) { // found a DP, but there exists ST as subpartition
238+ throw new SemanticException (
239+ ErrorMsg .PARTITION_DYN_STA_ORDER .getMsg());
240+ }
241+ } else {
242+ numStaPart -= 1
243+ }
244+ }
245+ }
246+
209247 val rdd = childRdd.mapPartitions { iter =>
210248 val serializer = newSerializer(fileSinkConf.getTableInfo)
211249 val standardOI = ObjectInspectorUtils
@@ -221,7 +259,7 @@ case class InsertIntoHiveTable(
221259 var i = 0
222260 while (i < fieldOIs.length) {
223261 if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) {
224- dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum)
262+ dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value )
225263 }
226264 // Casts Strings to HiveVarchars when necessary.
227265 outputData(i) = wrap(row(i), fieldOIs(i))
@@ -232,10 +270,6 @@ case class InsertIntoHiveTable(
232270 }
233271 }
234272
235- // ORC stores compression information in table properties. While, there are other formats
236- // (e.g. RCFile) that rely on hadoop configurations to store compression information.
237- val jobConf = new JobConf (sc.hiveconf)
238- val jobConfSer = new SerializableWritable (jobConf)
239273 if (dynamicPartNum> 0 ) {
240274 if (outputClass == null ) {
241275 throw new SparkException (" Output value class not set" )
@@ -300,8 +334,6 @@ case class InsertIntoHiveTable(
300334 v.commitJob()
301335 }
302336 writerMap.clear()
303- // writer.commitJob()
304-
305337 } else {
306338 saveAsHiveFile(
307339 rdd,
0 commit comments