From 217815e56f5c81e7a14b90de9bde5298ddd6a694 Mon Sep 17 00:00:00 2001 From: LantaoJin Date: Thu, 14 Nov 2019 23:46:38 +0800 Subject: [PATCH] [SPARK-29899][SQL] Recursively load data in Hive table via TBLPROPERTIES --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 9 +++++-- .../sql/hive/HiveMetastoreCatalogSuite.scala | 26 +++++++++++++++++++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 5ad2caba07fc..e3bb18c493ba 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -125,18 +125,23 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log private def isParquetProperty(key: String) = key.startsWith("parquet.") || key.contains(".parquet.") + private def isRecursiveFileLookupProperty(key: String) = + key.equalsIgnoreCase("recursiveFileLookup") + def convert(relation: HiveTableRelation): LogicalRelation = { val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT) // Consider table and storage properties. For properties existing in both sides, storage // properties will supersede table properties. if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.filterKeys(isParquetProperty) ++ + val options = relation.tableMeta.properties.filterKeys(p => + isParquetProperty(p) || isRecursiveFileLookupProperty(p)) ++ relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet") } else { - val options = relation.tableMeta.properties.filterKeys(isOrcProperty) ++ + val options = relation.tableMeta.properties.filterKeys(p => + isOrcProperty(p) || isRecursiveFileLookupProperty(p)) ++ relation.tableMeta.storage.properties if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { convertToLogicalRelation( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 0e45e18c4b17..e51cac1eb316 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -358,4 +358,30 @@ class DataSourceWithHiveMetastoreCatalogSuite Seq(table("src").count().toString)) } } + + test("SPARK-29899: Recursively load data in table via TBLPROPERTIES") { + withTempPath(dir => { + val baseDir = s"${dir.getCanonicalFile.toURI.toString}/path1" + val innerDir = s"$baseDir/path2/path3" + spark.range(3).selectExpr("id").write.parquet(innerDir) + withTable("test1", "test2") { + withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") { + spark.sql( + s""" + |CREATE TABLE test1 (id bigint) + |STORED AS PARQUET LOCATION '$baseDir' + |TBLPROPERTIES ( + | 'recursiveFileLookup'='true') + |""".stripMargin) + checkAnswer(spark.table("test1"), Seq(Row(0), Row(1), Row(2))) + spark.sql( + s""" + |CREATE TABLE test2 (id bigint) + |STORED AS PARQUET LOCATION '$baseDir' + |""".stripMargin) + checkAnswer(spark.table("test2"), Seq()) + } + } + }) + } }