diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index aa858e808edf..8c847b4a68f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -309,12 +309,33 @@ case class InsertIntoHiveTable( @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) + val pathnames = ArrayBuffer[String]() val outputPath = FileOutputFormat.getOutputPath(jobConf) // TODO: Correctly set holdDDLTime. // In most of the time, we should have holdDDLTime = false. // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { + val fs = FileSystem.get(hadoopConf) + if(fs.exists(tmpLocation)){ + val toStat = fs.getFileStatus(tmpLocation) + if(toStat.isDirectory()) { + val childrenfs = fs.listStatus(tmpLocation) + val pathname = childrenfs.map { x => x.getPath.getName } + if(partitionColumnNames.length == 1){ + pathnames ++= pathname + } else if (partitionColumnNames.length == 2){ + childrenfs.map { x => { + val firstpar = x.getPath.getName + if(x.isDirectory()){ + val childrenfs = fs.listStatus(x.getPath) + val pathname1 = childrenfs.map { x1 => firstpar + "/" + x1.getPath.getName } + pathnames ++= pathname1 + } + } } + } + } + } if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( db = table.catalogTable.database, @@ -390,6 +411,64 @@ case class InsertIntoHiveTable( logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) } + var source = "" + if(outputClass.getName.equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow")){ + source = "orc" + }else if (outputClass.getName.equalsIgnoreCase("org.apache.hadoop.io.Text")){ + source = "text" + } else if (outputClass.getName.equalsIgnoreCase("org.apache.parquet.hadoop.ParquetOutputCommitter")){ + source = "parquet" + } + //add merger files + if (source.equals("orc") || source.equals("text")){ + if (partition.nonEmpty && partitionColumnNames.length >= 3){} + else { + val tmppath = tableLocation + "_merges" + val blocksize = sqlContext.sparkContext.conf.getLong("hive.exec.orc.default.stripe.size", 268435456) + // non partition + if(partition.size == 0){ + val fs = FileSystem.get(hadoopConf) + + val filesize = fs.getContentSummary(tableLocation).getLength() + val numpartition = (filesize / blocksize + 1).toInt + val df = sqlContext.read.format(source).load(tableLocation.toString()) + .coalesce(numpartition) + df.write.format(source).save(tmppath) + + val srcPath = new Path(tmppath) + val dstPath = tableLocation + + if(fs.exists(dstPath)) fs.delete(dstPath, true) + fs.rename(srcPath, dstPath) + val successpath = new Path(tableLocation + "/_SUCCESS") + if(fs.exists(successpath)) fs.delete(successpath, true) + if(fs.exists(srcPath)) fs.delete(srcPath, true) + }else{ + //have partition ,but Supports 2 partition fields + val fs = FileSystem.get(hadoopConf) + pathnames.foreach { + x => { + val partitonTmppath= tableLocation.toString() + "/" + x + val partionPath = partitonTmppath + "_merges" + val filesize = fs.getContentSummary(new Path(partitonTmppath)).getLength() + val numpartition = (filesize / blocksize + 1).toInt + val df = sqlContext.read.format(source).load(partitonTmppath).coalesce(numpartition) + df.write.format(source).save(partionPath) + + val srcPath = new Path(partionPath) + val dstPath = new Path(partitonTmppath) + + if(fs.exists(dstPath)) fs.delete(dstPath, true) + fs.rename(srcPath, dstPath) + val successpath = new Path(partitonTmppath + "/_SUCCESS") + if(fs.exists(successpath)) fs.delete(successpath, true) + if(fs.exists(srcPath)) fs.delete(srcPath, true) + } + } + } + } + } + // end // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)