diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 461dfe3a66e1..3616cdf62372 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -264,12 +264,6 @@ object SQLConf { .booleanConf .createWithDefault(false) - val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath") - .doc("When true, check all the partition paths under the table\'s root directory " + - "when reading data stored in HDFS.") - .booleanConf - .createWithDefault(false) - val HIVE_METASTORE_PARTITION_PRUNING = buildConf("spark.sql.hive.metastorePartitionPruning") .doc("When true, some predicates will be pushed down into the Hive metastore so that " + @@ -768,8 +762,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def orcFilterPushDown: Boolean = getConf(ORC_FILTER_PUSHDOWN_ENABLED) - def verifyPartitionPath: Boolean = getConf(HIVE_VERIFY_PARTITION_PATH) - def metastorePartitionPruning: Boolean = getConf(HIVE_METASTORE_PARTITION_PRUNING) def manageFilesourcePartitions: Boolean = getConf(HIVE_MANAGE_FILESOURCE_PARTITIONS) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 16c1103dd1ea..b2184965978d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -159,36 +159,11 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sparkSession.sessionState.conf.verifyPartitionPath) { - partitionToDeserializer - } else { - var existPathSet = collection.mutable.Set[String]() - var pathPatternSet = collection.mutable.Set[String]() - partitionToDeserializer.filter { - case (partition, partDeserializer) => - def updateExistPathSetByPathPattern(pathPatternStr: String) { - val pathPattern = new Path(pathPatternStr) - val fs = pathPattern.getFileSystem(hadoopConf) - val matches = fs.globStatus(pathPattern) - matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) - } - // convert /demo/data/year/month/day to /demo/data/*/*/*/ - def getPathPatternByPath(parNum: Int, tempPath: Path): String = { - var path = tempPath - for (i <- (1 to parNum)) path = path.getParent - val tails = (1 to parNum).map(_ => "*").mkString("/", "/", "/") - path.toString + tails - } - - val partPath = partition.getDataLocation - val partNum = Utilities.getPartitionDesc(partition).getPartSpec.size(); - var pathPatternStr = getPathPatternByPath(partNum, partPath) - if (!pathPatternSet.contains(pathPatternStr)) { - pathPatternSet += pathPatternStr - updateExistPathSetByPathPattern(pathPatternStr) - } - existPathSet.contains(partPath.toString) - } + partitionToDeserializer.filter { + case (partition, partDeserializer) => + val partPath = partition.getDataLocation + val fs = partPath.getFileSystem(hadoopConf) + fs.exists(partPath) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala index 43b6bf5feeb6..1a258b3b03d6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/QueryPartitionSuite.scala @@ -32,40 +32,38 @@ class QueryPartitionSuite extends QueryTest with SQLTestUtils with TestHiveSingl import spark.implicits._ test("SPARK-5068: query data when path doesn't exist") { - withSQLConf((SQLConf.HIVE_VERIFY_PARTITION_PATH.key, "true")) { - val testData = sparkContext.parallelize( - (1 to 10).map(i => TestData(i, i.toString))).toDF() - testData.createOrReplaceTempView("testData") + val testData = sparkContext.parallelize( + (1 to 10).map(i => TestData(i, i.toString))).toDF() + testData.createOrReplaceTempView("testData") - val tmpDir = Files.createTempDir() - // create the table for test - sql(s"CREATE TABLE table_with_partition(key int,value string) " + - s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " + - "SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " + - "SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " + - "SELECT key,value FROM testData") - sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " + - "SELECT key,value FROM testData") + val tmpDir = Files.createTempDir() + // create the table for test + sql(s"CREATE TABLE table_with_partition(key int,value string) " + + s"PARTITIONED by (ds string) location '${tmpDir.toURI}' ") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='1') " + + "SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='2') " + + "SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='3') " + + "SELECT key,value FROM testData") + sql("INSERT OVERWRITE TABLE table_with_partition partition (ds='4') " + + "SELECT key,value FROM testData") - // test for the exist path - checkAnswer(sql("select key,value from table_with_partition"), - testData.toDF.collect ++ testData.toDF.collect - ++ testData.toDF.collect ++ testData.toDF.collect) + // test for the exist path + checkAnswer(sql("select key,value from table_with_partition"), + testData.toDF.collect ++ testData.toDF.collect + ++ testData.toDF.collect ++ testData.toDF.collect) - // delete the path of one partition - tmpDir.listFiles - .find { f => f.isDirectory && f.getName().startsWith("ds=") } - .foreach { f => Utils.deleteRecursively(f) } + // delete the path of one partition + tmpDir.listFiles + .find { f => f.isDirectory && f.getName().startsWith("ds=") } + .foreach { f => Utils.deleteRecursively(f) } - // test for after delete the path - checkAnswer(sql("select key,value from table_with_partition"), - testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect) + // test for after delete the path + checkAnswer(sql("select key,value from table_with_partition"), + testData.toDF.collect ++ testData.toDF.collect ++ testData.toDF.collect) - sql("DROP TABLE IF EXISTS table_with_partition") - sql("DROP TABLE IF EXISTS createAndInsertTest") - } + sql("DROP TABLE IF EXISTS table_with_partition") + sql("DROP TABLE IF EXISTS createAndInsertTest") } }