From 95aa9317b228220961074c04df06e1d08d2d8556 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Mar 2017 17:16:05 +0800 Subject: [PATCH 1/5] [SPARK-19833][SQL]remove SQLConf.HIVE_VERIFY_PARTITION_PATH, always return empty when the location does not exists --- .../apache/spark/sql/internal/SQLConf.scala | 8 --- .../apache/spark/sql/hive/TableReader.scala | 56 +++++++++--------- .../spark/sql/hive/QueryPartitionSuite.scala | 58 +++++++++---------- 3 files changed, 54 insertions(+), 68 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 461dfe3a66e1..3616cdf62372 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -264,12 +264,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") - .doc("When true, check all the partition paths under the table\'s root directory " + - "when reading data stored in HDFS.") - .booleanConf - .createWithDefault(false) - val HIVE_METASTORE_PARTITION_PRUNING = buildConf("spark.sql.hive.metastorePartitionPruning") .doc("When true, some predicates will be pushed down into the Hive metastore so that " + @@ -768,8 +762,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) - def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) - def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 16c1103dd1ea..e4562748f796 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -159,36 +159,32 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sparkSession.sessionState.conf.verifyPartitionPath) { - partitionToDeserializer - } else { - var existPathSet = collection.mutable.Set[String]() - var pathPatternSet = collection.mutable.Set[String]() - partitionToDeserializer.filter { - case (partition, partDeserializer) => - def updateExistPathSetByPathPattern(pathPatternStr: String) { - val pathPattern = new Path(pathPatternStr) - val fs = pathPattern.getFileSystem(hadoopConf) - val matches = fs.globStatus(pathPattern) - matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) - } - // convert /demo/data/year/month/day to /demo/data/*/*/*/ - def getPathPatternByPath(parNum: Int, tempPath: Path): String = { - var path = tempPath - for (i <- (1 to parNum)) path = path.getParent - val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") - path.toString + tails - } - - val partPath = partition.getDataLocation - val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); - var pathPatternStr = getPathPatternByPath(partNum, partPath) - if (!pathPatternSet.contains(pathPatternStr)) { - pathPatternSet += pathPatternStr - updateExistPathSetByPathPattern(pathPatternStr) - } - existPathSet.contains(partPath.toString) - } + var existPathSet = collection.mutable.Set[String]() + var pathPatternSet = collection.mutable.Set[String]() + partitionToDeserializer.filter { + case (partition, partDeserializer) => + def updateExistPathSetByPathPattern(pathPatternStr: String) { + val pathPattern = new Path(pathPatternStr) + val fs = pathPattern.getFileSystem(hadoopConf) + val matches = fs.globStatus(pathPattern) + matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) + } + // convert /demo/data/year/month/day to /demo/data/*/*/*/ + def getPathPatternByPath(parNum: Int, tempPath: Path): String = { + var path = tempPath + for (i <- (1 to parNum)) path = path.getParent + val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") + path.toString + tails + } + + val partPath = partition.getDataLocation + val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); + var pathPatternStr = getPathPatternByPath(partNum, partPath) + if (!pathPatternSet.contains(pathPatternStr)) { + pathPatternSet += pathPatternStr + updateExistPathSetByPathPattern(pathPatternStr) + } + existPathSet.contains(partPath.toString) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 43b6bf5feeb6..1a258b3b03d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -32,40 +32,38 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl import spark.implicits._ test("SPARK-5068: query data when path doesn't exist") { - withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) { - val testData = sparkContext.parallelize( - (1 to 10).map(i => TestData(i, i.toString))).toDF() - testData.createOrReplaceTempView("testData") + val testData = sparkContext.parallelize( + (1 to 10).map(i => TestData(i, i.toString))).toDF() + testData.createOrReplaceTempView("testData") - val tmpDir = Files.createTempDir() - // create the table for test - sql(s"CREATE TABLE table_with_partition(key int,value string) " + - s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " + - "SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " + - "SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " + - "SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " + - "SELECT key,value FROM testData") + val tmpDir = Files.createTempDir() + // create the table for test + sql(s"CREATE TABLE table_with_partition(key int,value string) " + + s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " + + "SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " + + "SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " + + "SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " + + "SELECT key,value FROM testData") - // test for the exist path - checkAnswer(sql("select key,value from table_with_partition"), - testData.toDF.collect ++ testData.toDF.collect - ++ testData.toDF.collect ++ testData.toDF.collect) + // test for the exist path + checkAnswer(sql("select key,value from table_with_partition"), + testData.toDF.collect ++ testData.toDF.collect + ++ testData.toDF.collect ++ testData.toDF.collect) - // delete the path of one partition - tmpDir.listFiles - .find { f => f.isDirectory && f.getName().startsWith("ds=") } - .foreach { f => Utils.deleteRecursively(f) } + // delete the path of one partition + tmpDir.listFiles + .find { f => f.isDirectory && f.getName().startsWith("ds=") } + .foreach { f => Utils.deleteRecursively(f) } - // test for after delete the path - checkAnswer(sql("select key,value from table_with_partition"), - testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect) + // test for after delete the path + checkAnswer(sql("select key,value from table_with_partition"), + testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect) - sql("DROP TABLE IF EXISTS table_with_partition") - sql("DROP TABLE IF EXISTS createAndInsertTest") - } + sql("DROP TABLE IF EXISTS table_with_partition") + sql("DROP TABLE IF EXISTS createAndInsertTest") } } From 8128567d2f22247331a8fb15a673ba01614848c1 Mon Sep 17 00:00:00 2001 From: windpiger Date: Mon, 6 Mar 2017 20:05:03 +0800 Subject: [PATCH 2/5] fix a bug --- .../org/apache/spark/sql/hive/TableReader.scala | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index e4562748f796..d1d8b1f83932 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -170,16 +170,21 @@ class HadoopTableReader( matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) } // convert /demo/data/year/month/day to /demo/data/*/*/*/ - def getPathPatternByPath(parNum: Int, tempPath: Path): String = { - var path = tempPath - for (i <- (1 to parNum)) path = path.getParent - val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") - path.toString + tails + def getPathPatternByPath(parNum: Int, tempPath: Path, partitionName: String): String = { + // if the partition path does not end with partition name, we should not + // generate the pattern, return the partition path directly + if (tempPath.toString.endsWith(partitionName)) { + var path = tempPath + for (i <- (1 to parNum)) path = path.getParent + val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") + path.toString + tails + } else tempPath.toString } val partPath = partition.getDataLocation + val partitionName = partition.getName val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); - var pathPatternStr = getPathPatternByPath(partNum, partPath) + var pathPatternStr = getPathPatternByPath(partNum, partPath, partitionName) if (!pathPatternSet.contains(pathPatternStr)) { pathPatternSet += pathPatternStr updateExistPathSetByPathPattern(pathPatternStr) From 22b1f538b31c50a6328afa9051e1e0ddc29da072 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 7 Mar 2017 13:42:03 +0800 Subject: [PATCH 3/5] add log to find why jenkins failed --- .../scala/org/apache/spark/sql/hive/TableReader.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index d1d8b1f83932..a7cb31912c56 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -185,10 +185,16 @@ class HadoopTableReader( val partitionName = partition.getName val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); var pathPatternStr = getPathPatternByPath(partNum, partPath, partitionName) + // scalastyle:off println + println(s"===$partPath, $partitionName, $partNum, $pathPatternStr===") if (!pathPatternSet.contains(pathPatternStr)) { pathPatternSet += pathPatternStr updateExistPathSetByPathPattern(pathPatternStr) } + // scalastyle:off println + println(s"=1==${existPathSet.size}, ${partPath.toString}, " + + s"${existPathSet.contains(partPath.toString)}===") + existPathSet.contains(partPath.toString) } } @@ -198,6 +204,11 @@ class HadoopTableReader( val partDesc = Utilities.getPartitionDesc(partition) val partPath = partition.getDataLocation val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) + + // scalastyle:off println + println(s"=1==${inputPathStr}, ${partPath.toString}, " + + s"${if (filterOpt.isDefined) filterOpt.get.toString}===") + val ifc = partDesc.getInputFileFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] // Get partition field info From 262e2f2221bb0565496c6af3f6512e730e529f83 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 7 Mar 2017 20:55:46 +0800 Subject: [PATCH 4/5] fix test failed --- .../apache/spark/sql/hive/TableReader.scala | 36 ++----------------- 1 file changed, 2 insertions(+), 34 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index a7cb31912c56..de0b3292b436 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -159,43 +159,11 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - var existPathSet = collection.mutable.Set[String]() - var pathPatternSet = collection.mutable.Set[String]() partitionToDeserializer.filter { case (partition, partDeserializer) => - def updateExistPathSetByPathPattern(pathPatternStr: String) { - val pathPattern = new Path(pathPatternStr) - val fs = pathPattern.getFileSystem(hadoopConf) - val matches = fs.globStatus(pathPattern) - matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) - } - // convert /demo/data/year/month/day to /demo/data/*/*/*/ - def getPathPatternByPath(parNum: Int, tempPath: Path, partitionName: String): String = { - // if the partition path does not end with partition name, we should not - // generate the pattern, return the partition path directly - if (tempPath.toString.endsWith(partitionName)) { - var path = tempPath - for (i <- (1 to parNum)) path = path.getParent - val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") - path.toString + tails - } else tempPath.toString - } - val partPath = partition.getDataLocation - val partitionName = partition.getName - val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); - var pathPatternStr = getPathPatternByPath(partNum, partPath, partitionName) - // scalastyle:off println - println(s"===$partPath, $partitionName, $partNum, $pathPatternStr===") - if (!pathPatternSet.contains(pathPatternStr)) { - pathPatternSet += pathPatternStr - updateExistPathSetByPathPattern(pathPatternStr) - } - // scalastyle:off println - println(s"=1==${existPathSet.size}, ${partPath.toString}, " + - s"${existPathSet.contains(partPath.toString)}===") - - existPathSet.contains(partPath.toString) + val fs = partPath.getFileSystem(hadoopConf) + fs.exists(partPath) } } From 3a15e5d709aba611a95136a9b59f65a08137d300 Mon Sep 17 00:00:00 2001 From: windpiger Date: Tue, 7 Mar 2017 21:51:21 +0800 Subject: [PATCH 5/5] remove log --- .../main/scala/org/apache/spark/sql/hive/TableReader.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index de0b3292b436..b2184965978d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -172,11 +172,6 @@ class HadoopTableReader( val partDesc = Utilities.getPartitionDesc(partition) val partPath = partition.getDataLocation val inputPathStr = applyFilterIfNeeded(partPath, filterOpt) - - // scalastyle:off println - println(s"=1==${inputPathStr}, ${partPath.toString}, " + - s"${if (filterOpt.isDefined) filterOpt.get.toString}===") - val ifc = partDesc.getInputFileFormatClass .asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]] // Get partition field info