From ba63a06c1b54df903046146a69ca6d1a1acb5bef Mon Sep 17 00:00:00 2001 From: wuzhilon <358471559@qq.com> Date: Fri, 30 Jun 2017 14:30:07 +0800 Subject: [PATCH 1/3] Update InsertIntoHiveTable.scala Merge hive small files into large files, support orc and text data table storage format --- .../hive/execution/InsertIntoHiveTable.scala | 78 +++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index aa858e808edf..39c32296d84a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -309,12 +309,33 @@ case class InsertIntoHiveTable( @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) + val pathnames = ArrayBuffer[String]() val outputPath = FileOutputFormat.getOutputPath(jobConf) // TODO: Correctly set holdDDLTime. // In most of the time, we should have holdDDLTime = false. // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { + val fs = FileSystem.get(hadoopConf) + if(fs.exists(tmpLocation)){ + val toStat = fs.getFileStatus(tmpLocation) + if(toStat.isDirectory()) { + val childrenfs = fs.listStatus(tmpLocation) + val pathname = childrenfs.map { x => x.getPath.getName } + if(partitionColumnNames.length == 1){ + pathnames ++= pathname + } else if (partitionColumnNames.length == 2){ + childrenfs.map { x => { + val firstpar = x.getPath.getName + if(x.isDirectory()){ + val childrenfs = fs.listStatus(x.getPath) + val pathname1 = childrenfs.map { x1 => firstpar + "/" + x1.getPath.getName } + pathnames ++= pathname1 + } + } } + } + } + } if (numDynamicPartitions > 0) { externalCatalog.loadDynamicPartitions( db = table.catalogTable.database, @@ -390,6 +411,63 @@ case class InsertIntoHiveTable( logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e) } + var source = "" + if(outputClass.getName.equalsIgnoreCase("org.apache.hadoop.hive.ql.io.orc.OrcSerde$OrcSerdeRow")){ + source = "orc" + }else if (outputClass.getName.equalsIgnoreCase("org.apache.hadoop.io.Text")){ + source = "text" + } else if (outputClass.getName.equalsIgnoreCase("org.apache.parquet.hadoop.ParquetOutputCommitter")){ + source = "parquet" + } + //wuzl add merger files + if (source.equals("orc") || source.equals("text")){ + if (partition.nonEmpty && partitionColumnNames.length >= 3){} + else { + val tmppath = tableLocation + "_merges" + val blocksize = sqlContext.sparkContext.conf.getLong("hive.exec.orc.default.stripe.size", 268435456) + // non partition + if(partition.size == 0){ + val fs = FileSystem.get(hadoopConf) + + val filesize = fs.getContentSummary(tableLocation).getLength() + val numpartition = (filesize / blocksize + 1).toInt + val df = sqlContext.read.format(source).load(tableLocation.toString()) + .repartition(numpartition) + df.write.format(source).save(tmppath) + + val srcPath = new Path(tmppath) + val dstPath = tableLocation + + if(fs.exists(dstPath)) fs.delete(dstPath, true) + fs.rename(srcPath, dstPath) + val successpath = new Path(tableLocation + "/_SUCCESS") + if(fs.exists(successpath)) fs.delete(successpath, true) + + }else{ + //have partition ,but Supports 2 partition fields + val fs = FileSystem.get(hadoopConf) + pathnames.foreach { + x => { + val partitonTmppath= tableLocation.toString() + "/" + x + val partionPath = partitonTmppath + "_merges" + val filesize = fs.getContentSummary(new Path(partitonTmppath)).getLength() + val numpartition = (filesize / blocksize + 1).toInt + val df = sqlContext.read.format(source).load(partitonTmppath).repartition(numpartition) + df.write.format(source).save(partionPath) + + val srcPath = new Path(partionPath) + val dstPath = new Path(partitonTmppath) + + if(fs.exists(dstPath)) fs.delete(dstPath, true) + fs.rename(srcPath, dstPath) + val successpath = new Path(partitonTmppath + "/_SUCCESS") + if(fs.exists(successpath)) fs.delete(successpath, true) + } + } + } + } + } + //wuzl end // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier) From 577293be9cea0874bb82dee6b7e54727b4ee8b22 Mon Sep 17 00:00:00 2001 From: wuzhilon <358471559@qq.com> Date: Fri, 21 Jul 2017 19:54:08 +0800 Subject: [PATCH 2/3] Update InsertIntoHiveTable.scala --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 39c32296d84a..5372ca4cf91e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -432,7 +432,7 @@ case class InsertIntoHiveTable( val filesize = fs.getContentSummary(tableLocation).getLength() val numpartition = (filesize / blocksize + 1).toInt val df = sqlContext.read.format(source).load(tableLocation.toString()) - .repartition(numpartition) + .coalesce(numpartition) df.write.format(source).save(tmppath) val srcPath = new Path(tmppath) @@ -442,7 +442,7 @@ case class InsertIntoHiveTable( fs.rename(srcPath, dstPath) val successpath = new Path(tableLocation + "/_SUCCESS") if(fs.exists(successpath)) fs.delete(successpath, true) - + if(fs.exists(srcPath)) fs.delete(srcPath, true) }else{ //have partition ,but Supports 2 partition fields val fs = FileSystem.get(hadoopConf) @@ -452,7 +452,7 @@ case class InsertIntoHiveTable( val partionPath = partitonTmppath + "_merges" val filesize = fs.getContentSummary(new Path(partitonTmppath)).getLength() val numpartition = (filesize / blocksize + 1).toInt - val df = sqlContext.read.format(source).load(partitonTmppath).repartition(numpartition) + val df = sqlContext.read.format(source).load(partitonTmppath).coalesce(numpartition) df.write.format(source).save(partionPath) val srcPath = new Path(partionPath) @@ -462,6 +462,7 @@ case class InsertIntoHiveTable( fs.rename(srcPath, dstPath) val successpath = new Path(partitonTmppath + "/_SUCCESS") if(fs.exists(successpath)) fs.delete(successpath, true) + if(fs.exists(srcPath)) fs.delete(srcPath, true) } } } From ce166e1d284854b91de752a5cb8b24a260b92999 Mon Sep 17 00:00:00 2001 From: wuzhilon <358471559@qq.com> Date: Fri, 21 Jul 2017 19:54:44 +0800 Subject: [PATCH 3/3] Update InsertIntoHiveTable.scala --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 5372ca4cf91e..8c847b4a68f4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -419,7 +419,7 @@ case class InsertIntoHiveTable( } else if (outputClass.getName.equalsIgnoreCase("org.apache.parquet.hadoop.ParquetOutputCommitter")){ source = "parquet" } - //wuzl add merger files + //add merger files if (source.equals("orc") || source.equals("text")){ if (partition.nonEmpty && partitionColumnNames.length >= 3){} else { @@ -468,7 +468,7 @@ case class InsertIntoHiveTable( } } } - //wuzl end + // end // Invalidate the cache. sqlContext.sharedState.cacheManager.invalidateCache(table) sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)