Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -309,12 +309,33 @@ case class InsertIntoHiveTable(
@transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)

val pathnames = ArrayBuffer[String]()
val outputPath = FileOutputFormat.getOutputPath(jobConf)
// TODO: Correctly set holdDDLTime.
// In most of the time, we should have holdDDLTime = false.
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
val holdDDLTime = false
if (partition.nonEmpty) {
val fs = FileSystem.get(hadoopConf)
if(fs.exists(tmpLocation)){
val toStat = fs.getFileStatus(tmpLocation)
if(toStat.isDirectory()) {
val childrenfs = fs.listStatus(tmpLocation)
val pathname = childrenfs.map { x => x.getPath.getName }
if(partitionColumnNames.length == 1){
pathnames ++= pathname
} else if (partitionColumnNames.length == 2){
childrenfs.map { x => {
val firstpar = x.getPath.getName
if(x.isDirectory()){
val childrenfs = fs.listStatus(x.getPath)
val pathname1 = childrenfs.map { x1 => firstpar + "/" + x1.getPath.getName }
pathnames ++= pathname1
}
} }
}
}
}
if (numDynamicPartitions > 0) {
externalCatalog.loadDynamicPartitions(
db = table.catalogTable.database,
Expand Down Expand Up @@ -390,6 +411,64 @@ case class InsertIntoHiveTable(
logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
}

var source = ""
if(outputClass.getName.equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow")){
source = "orc"
}else if (outputClass.getName.equalsIgnoreCase("org.apache.hadoop.io.Text")){
source = "text"
} else if (outputClass.getName.equalsIgnoreCase("org.apache.parquet.hadoop.ParquetOutputCommitter")){
source = "parquet"
}
//add merger files
if (source.equals("orc") || source.equals("text")){
if (partition.nonEmpty && partitionColumnNames.length >= 3){}
else {
val tmppath = tableLocation + "_merges"
val blocksize = sqlContext.sparkContext.conf.getLong("hive.exec.orc.default.stripe.size", 268435456)
// non partition
if(partition.size == 0){
val fs = FileSystem.get(hadoopConf)

val filesize = fs.getContentSummary(tableLocation).getLength()
val numpartition = (filesize / blocksize + 1).toInt
val df = sqlContext.read.format(source).load(tableLocation.toString())
.coalesce(numpartition)
df.write.format(source).save(tmppath)

val srcPath = new Path(tmppath)
val dstPath = tableLocation

if(fs.exists(dstPath)) fs.delete(dstPath, true)
fs.rename(srcPath, dstPath)
val successpath = new Path(tableLocation + "/_SUCCESS")
if(fs.exists(successpath)) fs.delete(successpath, true)
if(fs.exists(srcPath)) fs.delete(srcPath, true)
}else{
//have partition ,but Supports 2 partition fields
val fs = FileSystem.get(hadoopConf)
pathnames.foreach {
x => {
val partitonTmppath= tableLocation.toString() + "/" + x
val partionPath = partitonTmppath + "_merges"
val filesize = fs.getContentSummary(new Path(partitonTmppath)).getLength()
val numpartition = (filesize / blocksize + 1).toInt
val df = sqlContext.read.format(source).load(partitonTmppath).coalesce(numpartition)
df.write.format(source).save(partionPath)

val srcPath = new Path(partionPath)
val dstPath = new Path(partitonTmppath)

if(fs.exists(dstPath)) fs.delete(dstPath, true)
fs.rename(srcPath, dstPath)
val successpath = new Path(partitonTmppath + "/_SUCCESS")
if(fs.exists(successpath)) fs.delete(successpath, true)
if(fs.exists(srcPath)) fs.delete(srcPath, true)
}
}
}
}
}
// end
// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
Expand Down