From e406792a2a5b69c40864ff16dbdbb534a25e5c2f Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 11 Nov 2015 21:22:56 -0800 Subject: [PATCH 1/7] Partition discovery stops at the root path of the table. --- .../datasources/PartitioningUtils.scala | 40 ++++++- .../apache/spark/sql/sources/interfaces.scala | 18 +++- .../ParquetPartitionDiscoverySuite.scala | 102 ++++++++++++++++-- 3 files changed, 142 insertions(+), 18 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 86bc3a1b6dab2..fc544327aecb7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -75,10 +75,11 @@ private[sql] object PartitioningUtils { private[sql] def parsePartitions( paths: Seq[Path], defaultPartitionName: String, - typeInference: Boolean): PartitionSpec = { + typeInference: Boolean, + rootPaths: Set[Path]): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optBasePaths) = paths.map { path => - parsePartition(path, defaultPartitionName, typeInference) + parsePartition(path, defaultPartitionName, typeInference, rootPaths) }.unzip // We create pairs of (path -> path's partition value) here @@ -152,11 +153,14 @@ private[sql] object PartitioningUtils { private[sql] def parsePartition( path: Path, defaultPartitionName: String, - typeInference: Boolean): (Option[PartitionValues], Option[Path]) = { + typeInference: Boolean, + rootPaths: Set[Path]): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null + // chopped path is the current path that we will use to parse partition column value. var chopped = path + // base path will be the child of chopped in the loop below. var basePath = path while (!finished) { @@ -166,11 +170,37 @@ private[sql] object PartitioningUtils { return (None, None) } + // Let's say chopped is a path of /table/a=1/, chopped.getName will give us a=1. + // Once we get the string, we try to parse it and find the partition column and value. val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) - maybeColumn.foreach(columns += _) + + // Now, basePath will be /table/a=1/ basePath = chopped + // chopped will be /table/ chopped = chopped.getParent - finished = (maybeColumn.isEmpty && !columns.isEmpty) || chopped.getParent == null + + // Now, we determine if we should continue. + // When we hit any of the following three cases, we will not continue: + // - In this iteration, we could not parse the value of partition column and value, + // i.e. maybeColumn is None, and columns is not empty. At here we check if columns is + // empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in + // this case). + // - After we get the new chopped, this new chopped represent the path of "/table", i.e. + // chopped.getParent == null. + // - The chopped we used to parse partition column and value (right now, it is basePath), + // is already the root path of a table. For the example of /table/a=1/, /table/ is the + // root path. + finished = + (maybeColumn.isEmpty && !columns.isEmpty) || + chopped.getParent == null || + rootPaths.contains(basePath) + + if (maybeColumn.isDefined && !rootPaths.contains(basePath)) { + // If we can parse the partition column and its value, and the path + // we used for parsing is not the root path, we should append the parsed + // result to columns. + maybeColumn.foreach(columns += _) + } } if (columns.isEmpty) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 5b8841bc154a5..29045a2628a68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -519,7 +519,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } /** - * Base paths of this relation. For partitioned relations, it should be either root directories + * Base paths of this relation. For partitioned relations, it should be root directories * of all partition directories. * * @since 1.4.0 @@ -554,12 +554,19 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } private def discoverPartitions(): PartitionSpec = { + val rootDirs = paths.map { path => + new Path(path) + }.toSet + // We use leaf dirs containing data files to discover the schema. val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq userDefinedPartitionColumns match { case Some(userProvidedSchema) if userProvidedSchema.nonEmpty => val spec = PartitioningUtils.parsePartitions( - leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false) + leafDirs, + PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference = false, + rootPaths = rootDirs) // Without auto inference, all of value in the `row` should be null or in StringType, // we need to cast into the data type that user specified. @@ -577,8 +584,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio case _ => // user did not provide a partitioning schema - PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, - typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled()) + PartitioningUtils.parsePartitions( + leafDirs, + PartitioningUtils.DEFAULT_PARTITION_NAME, + typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled(), + rootPaths = rootDirs) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 61cc0da50865c..d361455774870 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -66,7 +66,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha "hdfs://host:9000/path/a=10.5/b=hello") var exception = intercept[AssertionError] { - parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + parsePartitions(paths.map(new Path(_)), defaultPartitionName, true, Set.empty[Path]) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -76,7 +76,37 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha "hdfs://host:9000/path/a=10/b=20", "hdfs://host:9000/path/_temporary/path") - parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + parsePartitions( + paths.map(new Path(_)), + defaultPartitionName, + true, + Set(new Path("hdfs://host:9000/path/"))) + + // Valid + paths = Seq( + "hdfs://host:9000/path/something=true/table/", + "hdfs://host:9000/path/something=true/table/_temporary", + "hdfs://host:9000/path/something=true/table/a=10/b=20", + "hdfs://host:9000/path/something=true/table/_temporary/path") + + parsePartitions( + paths.map(new Path(_)), + defaultPartitionName, + true, + Set(new Path("hdfs://host:9000/path/something=true/table"))) + + // Valid + paths = Seq( + "hdfs://host:9000/path/table=true/", + "hdfs://host:9000/path/table=true/_temporary", + "hdfs://host:9000/path/table=true/a=10/b=20", + "hdfs://host:9000/path/table=true/_temporary/path") + + parsePartitions( + paths.map(new Path(_)), + defaultPartitionName, + true, + Set(new Path("hdfs://host:9000/path/table=true"))) // Invalid paths = Seq( @@ -85,7 +115,11 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha "hdfs://host:9000/path/path1") exception = intercept[AssertionError] { - parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + parsePartitions( + paths.map(new Path(_)), + defaultPartitionName, + true, + Set(new Path("hdfs://host:9000/path/"))) } assert(exception.getMessage().contains("Conflicting directory structures detected")) @@ -101,19 +135,24 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha "hdfs://host:9000/tmp/tables/nonPartitionedTable2") exception = intercept[AssertionError] { - parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) + parsePartitions( + paths.map(new Path(_)), + defaultPartitionName, + true, + Set(new Path("hdfs://host:9000/tmp/tables/"))) } assert(exception.getMessage().contains("Conflicting directory structures detected")) } test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { - assert(expected === parsePartition(new Path(path), defaultPartitionName, true)._1) + val actual = parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path])._1 + assert(expected === actual) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), defaultPartitionName, true) + parsePartition(new Path(path), defaultPartitionName, true, Set.empty[Path]) }.getMessage assert(message.contains(expected)) @@ -152,8 +191,17 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } test("parse partitions") { - def check(paths: Seq[String], spec: PartitionSpec): Unit = { - assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec) + def check( + paths: Seq[String], + spec: PartitionSpec, + rootPaths: Set[Path] = Set.empty[Path]): Unit = { + val actualSpec = + parsePartitions( + paths.map(new Path(_)), + defaultPartitionName, + true, + rootPaths) + assert(actualSpec === spec) } check(Seq( @@ -232,7 +280,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partitions with type inference disabled") { def check(paths: Seq[String], spec: PartitionSpec): Unit = { - assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec) + val actualSpec = + parsePartitions(paths.map(new Path(_)), defaultPartitionName, false, Set.empty[Path]) + assert(actualSpec === spec) } check(Seq( @@ -590,6 +640,40 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } + test("SPARK-11678: Partition discovery stops at the root path of the dataset") { + withTempPath { dir => + val tablePath = new File(dir, "key=value") + val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d") + + df.write + .format("parquet") + .partitionBy("b", "c", "d") + .save(tablePath.getCanonicalPath) + + Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS")) + Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar")) + + checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df) + } + + withTempPath { dir => + val path = new File(dir, "key=value") + val tablePath = new File(path, "table") + + val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d") + + df.write + .format("parquet") + .partitionBy("b", "c", "d") + .save(tablePath.getCanonicalPath) + + Files.touch(new File(s"${tablePath.getCanonicalPath}/", "_SUCCESS")) + Files.createParentDirs(new File(s"${dir.getCanonicalPath}/b=1/c=1/.foo/bar")) + + checkAnswer(sqlContext.read.format("parquet").load(tablePath.getCanonicalPath), df) + } + } + test("listConflictingPartitionColumns") { def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = { val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) => From 28a122752cbcc8ea2abff314441522b12f96a6f9 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 11 Nov 2015 22:19:09 -0800 Subject: [PATCH 2/7] Update tests. --- .../execution/datasources/parquet/ParquetFilterSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 579dabf73318b..0c4583e0a439e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -294,7 +294,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // If the "part = 1" filter gets pushed down, this query will throw an exception since // "part" is not a valid column in the actual Parquet file checkAnswer( - sqlContext.read.parquet(path).filter("part = 1"), + sqlContext.read.parquet(dir.getCanonicalPath).filter("part = 1"), (1 to 3).map(i => Row(i, i.toString, 1))) } } @@ -311,7 +311,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex // If the "part = 1" filter gets pushed down, this query will throw an exception since // "part" is not a valid column in the actual Parquet file checkAnswer( - sqlContext.read.parquet(path).filter("a > 0 and (part = 0 or a > 1)"), + sqlContext.read.parquet(dir.getCanonicalPath).filter("a > 0 and (part = 0 or a > 1)"), (2 to 3).map(i => Row(i, i.toString, 1))) } } From df48220ce352851fe990bcb176bd85d0e63f8291 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 12 Nov 2015 13:15:46 -0800 Subject: [PATCH 3/7] Update --- .../datasources/PartitioningUtils.scala | 80 +++++++++---------- 1 file changed, 39 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index fc544327aecb7..309a73851f4d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -73,13 +73,13 @@ private[sql] object PartitioningUtils { * }}} */ private[sql] def parsePartitions( - paths: Seq[Path], - defaultPartitionName: String, - typeInference: Boolean, - rootPaths: Set[Path]): PartitionSpec = { + paths: Seq[Path], + defaultPartitionName: String, + typeInference: Boolean, + basePaths: Set[Path]): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optBasePaths) = paths.map { path => - parsePartition(path, defaultPartitionName, typeInference, rootPaths) + parsePartition(path, defaultPartitionName, typeInference, basePaths) }.unzip // We create pairs of (path -> path's partition value) here @@ -158,48 +158,46 @@ private[sql] object PartitioningUtils { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null - // chopped path is the current path that we will use to parse partition column value. - var chopped = path - // base path will be the child of chopped in the loop below. - var basePath = path + // currentPath is the current path that we will use to parse partition column value. + var currentPath = path + // base path will be the child of currentPath in the loop below. + var childPath = path while (!finished) { // Sometimes (e.g., when speculative task is enabled), temporary directories may be left - // uncleaned. Here we simply ignore them. - if (chopped.getName.toLowerCase == "_temporary") { + // uncleaned. Here we simply ignore them. + if (currentPath.getName.toLowerCase == "_temporary") { return (None, None) } - // Let's say chopped is a path of /table/a=1/, chopped.getName will give us a=1. - // Once we get the string, we try to parse it and find the partition column and value. - val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference) - - // Now, basePath will be /table/a=1/ - basePath = chopped - // chopped will be /table/ - chopped = chopped.getParent - - // Now, we determine if we should continue. - // When we hit any of the following three cases, we will not continue: - // - In this iteration, we could not parse the value of partition column and value, - // i.e. maybeColumn is None, and columns is not empty. At here we check if columns is - // empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in - // this case). - // - After we get the new chopped, this new chopped represent the path of "/table", i.e. - // chopped.getParent == null. - // - The chopped we used to parse partition column and value (right now, it is basePath), - // is already the root path of a table. For the example of /table/a=1/, /table/ is the - // root path. - finished = - (maybeColumn.isEmpty && !columns.isEmpty) || - chopped.getParent == null || - rootPaths.contains(basePath) - - if (maybeColumn.isDefined && !rootPaths.contains(basePath)) { - // If we can parse the partition column and its value, and the path - // we used for parsing is not the root path, we should append the parsed - // result to columns. + if (rootPaths.contains(currentPath)) { + finished = true + } else { + // Let's say currentPath is a path of /table/a=1/, currentPath.getName will give us a=1. + // Once we get the string, we try to parse it and find the partition column and value. + val maybeColumn = parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference) maybeColumn.foreach(columns += _) + + // Now, we determine if we should continue. + // When we hit any of the following three cases, we will not continue: + // - In this iteration, we could not parse the value of partition column and value, + // i.e. maybeColumn is None, and columns is not empty. At here we check if columns is + // empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in + // this case). + // - After we get the new currentPath, this new currentPath represent the path of "/table", + // i.e. currentPath.getParent == null. + // - The currentPath we used to parse partition column and value (right now, + // it is childPath), is already the root path of a table. For the example of /table/a=1/, + // /table/ is the root path. + finished = + (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null + + if (!finished) { + // Now, childPath will be /table/a=1/ + childPath = currentPath + // currentPath will be /table/ + currentPath = currentPath.getParent + } } } @@ -207,7 +205,7 @@ private[sql] object PartitioningUtils { (None, Some(path)) } else { val (columnNames, values) = columns.reverse.unzip - (Some(PartitionValues(columnNames, values)), Some(basePath)) + (Some(PartitionValues(columnNames, values)), Some(currentPath)) } } From 1744624de54cfe40320c3a721cff93f0ebdfe858 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 12 Nov 2015 16:46:46 -0800 Subject: [PATCH 4/7] Add a basePath option to HadoopFsRelations. --- .../datasources/PartitioningUtils.scala | 6 ++- .../datasources/json/JSONRelation.scala | 21 ++++++---- .../datasources/parquet/ParquetRelation.scala | 2 +- .../datasources/text/DefaultSource.scala | 5 ++- .../apache/spark/sql/sources/interfaces.scala | 41 +++++++++++++++---- .../ParquetPartitionDiscoverySuite.scala | 30 ++++++++++++++ .../spark/sql/hive/orc/OrcRelation.scala | 2 +- .../sql/sources/SimpleTextRelation.scala | 2 +- .../sql/sources/hadoopFsRelationSuites.scala | 1 + 9 files changed, 86 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 309a73851f4d0..e0365c8e58368 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -106,7 +106,11 @@ private[sql] object PartitioningUtils { assert( basePaths.distinct.size == 1, "Conflicting directory structures detected. Suspicious paths:\b" + - basePaths.distinct.mkString("\n\t", "\n\t", "\n\n")) + basePaths.distinct.mkString("\n\t", "\n\t", "\n\n") + + "If provided paths are partition directories, please set " + + "\"basePath\" in the options of the data source to specify the " + + "root directory of the table. If there are multiple root directories, " + + "please load them separately and then union them.") val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala index 85b52f04c8d01..dca638b7f67a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala @@ -56,13 +56,14 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { val primitivesAsString = parameters.get("primitivesAsString").map(_.toBoolean).getOrElse(false) new JSONRelation( - None, - samplingRatio, - primitivesAsString, - dataSchema, - None, - partitionColumns, - paths)(sqlContext) + inputRDD = None, + samplingRatio = samplingRatio, + primitivesAsString = primitivesAsString, + maybeDataSchema = dataSchema, + maybePartitionSpec = None, + userDefinedPartitionColumns = partitionColumns, + paths = paths, + parameters = parameters)(sqlContext) } } @@ -73,8 +74,10 @@ private[sql] class JSONRelation( val maybeDataSchema: Option[StructType], val maybePartitionSpec: Option[PartitionSpec], override val userDefinedPartitionColumns: Option[StructType], - override val paths: Array[String] = Array.empty[String])(@transient val sqlContext: SQLContext) - extends HadoopFsRelation(maybePartitionSpec) { + override val paths: Array[String] = Array.empty[String], + parameters: Map[String, String] = Map.empty[String, String]) + (@transient val sqlContext: SQLContext) + extends HadoopFsRelation(maybePartitionSpec, parameters) { /** Constraints to be imposed on schema to be stored. */ private def checkConstraints(schema: StructType): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 21337b2932aac..cb0aab8cc0d09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -109,7 +109,7 @@ private[sql] class ParquetRelation( override val userDefinedPartitionColumns: Option[StructType], parameters: Map[String, String])( val sqlContext: SQLContext) - extends HadoopFsRelation(maybePartitionSpec) + extends HadoopFsRelation(maybePartitionSpec, parameters) with Logging { private[sql] def this( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala index 4b8b8e4e74dad..fbd387bc2ef47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/text/DefaultSource.scala @@ -71,9 +71,10 @@ class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister { private[sql] class TextRelation( val maybePartitionSpec: Option[PartitionSpec], override val userDefinedPartitionColumns: Option[StructType], - override val paths: Array[String] = Array.empty[String]) + override val paths: Array[String] = Array.empty[String], + parameters: Map[String, String] = Map.empty[String, String]) (@transient val sqlContext: SQLContext) - extends HadoopFsRelation(maybePartitionSpec) { + extends HadoopFsRelation(maybePartitionSpec, parameters) { /** Data schema is always a single column, named "text". */ override def dataSchema: StructType = new StructType().add("value", StringType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 29045a2628a68..cd1565f87abe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -414,12 +414,19 @@ abstract class OutputWriter { * @since 1.4.0 */ @Experimental -abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[PartitionSpec]) +abstract class HadoopFsRelation private[sql]( + maybePartitionSpec: Option[PartitionSpec], + parameters: Map[String, String]) extends BaseRelation with FileRelation with Logging { override def toString: String = getClass.getSimpleName + paths.mkString("[", ",", "]") - def this() = this(None) + def this() = this(None, Map.empty[String, String]) + + def this(parameters: Map[String, String]) = this(None, parameters) + + private[sql] def this(maybePartitionSpec: Option[PartitionSpec]) = + this(maybePartitionSpec, Map.empty[String, String]) private val hadoopConf = new Configuration(sqlContext.sparkContext.hadoopConfiguration) @@ -519,13 +526,33 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } /** - * Base paths of this relation. For partitioned relations, it should be root directories + * Paths of this relation. For partitioned relations, it should be root directories * of all partition directories. * * @since 1.4.0 */ def paths: Array[String] + /** + * Contains a set of paths that are considered as the base dirs of the input datasets. + * The partitioning discovery logic will make sure it will stop when it reaches any + * base path. By default, the paths of the dataset provided by users will be base paths. + * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path + * will be `/path/something=true/`. The returned DataFrame will not contain a column of + * `something`. Users do not a option to override the basePath. They can use `basePath` to path + * the new base path to the data source. For the above example, if the user-provided base path + * is `/path/` instead of `/path/something=true`, the returned DataFrame will have the column + * of `something`. + */ + private def basePaths: Set[Path] = { + val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) + userDefinedBasePath.getOrElse { + // If the user does not provide basePath, we will just use paths. + val pathSet = paths.toSet + pathSet.map(p => new Path(p)) + } + } + override def inputFiles: Array[String] = cachedLeafStatuses().map(_.getPath.toString).toArray override def sizeInBytes: Long = cachedLeafStatuses().map(_.getLen).sum @@ -554,10 +581,6 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio } private def discoverPartitions(): PartitionSpec = { - val rootDirs = paths.map { path => - new Path(path) - }.toSet - // We use leaf dirs containing data files to discover the schema. val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq userDefinedPartitionColumns match { @@ -566,7 +589,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = false, - rootPaths = rootDirs) + basePaths = basePaths) // Without auto inference, all of value in the `row` should be null or in StringType, // we need to cast into the data type that user specified. @@ -588,7 +611,7 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME, typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled(), - rootPaths = rootDirs) + basePaths = basePaths) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index d361455774870..71e9034d97792 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -674,6 +674,36 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha } } + test("use basePath to specify the root dir of a partitioned table.") { + withTempPath { dir => + val tablePath = new File(dir, "table") + val df = (1 to 3).map(i => (i, i, i, i)).toDF("a", "b", "c", "d") + + df.write + .format("parquet") + .partitionBy("b", "c", "d") + .save(tablePath.getCanonicalPath) + + val twoPartitionsDF = + sqlContext + .read + .option("basePath", tablePath.getCanonicalPath) + .parquet( + s"${tablePath.getCanonicalPath}/b=1", + s"${tablePath.getCanonicalPath}/b=2") + + checkAnswer(twoPartitionsDF, df.filter("b != 3")) + + intercept[AssertionError] { + sqlContext + .read + .parquet( + s"${tablePath.getCanonicalPath}/b=1", + s"${tablePath.getCanonicalPath}/b=2") + } + } + } + test("listConflictingPartitionColumns") { def makeExpectedMessage(colNameLists: Seq[String], paths: Seq[String]): String = { val conflictingColNameLists = colNameLists.zipWithIndex.map { case (list, index) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 45de567039760..1136670b7a0eb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -157,7 +157,7 @@ private[sql] class OrcRelation( override val userDefinedPartitionColumns: Option[StructType], parameters: Map[String, String])( @transient val sqlContext: SQLContext) - extends HadoopFsRelation(maybePartitionSpec) + extends HadoopFsRelation(maybePartitionSpec, parameters) with Logging { private[sql] def this( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index bdc48a383bbbf..01960fd2901b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -89,7 +89,7 @@ class SimpleTextRelation( override val userDefinedPartitionColumns: Option[StructType], parameters: Map[String, String])( @transient val sqlContext: SQLContext) - extends HadoopFsRelation { + extends HadoopFsRelation(parameters) { import sqlContext.sparkContext diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index 100b97137cff0..665e87e3e3355 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -486,6 +486,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes val df = sqlContext.read .format(dataSourceName) .option("dataSchema", dataSchema.json) + .option("basePath", file.getCanonicalPath) .load(s"${file.getCanonicalPath}/p1=*/p2=???") val expectedPaths = Set( From 89f21baa307d79cd31cd9d8bd93329c7ecf52e97 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 12 Nov 2015 16:58:07 -0800 Subject: [PATCH 5/7] Format. --- .../sql/execution/datasources/PartitioningUtils.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index e0365c8e58368..ebb99096259a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -73,10 +73,10 @@ private[sql] object PartitioningUtils { * }}} */ private[sql] def parsePartitions( - paths: Seq[Path], - defaultPartitionName: String, - typeInference: Boolean, - basePaths: Set[Path]): PartitionSpec = { + paths: Seq[Path], + defaultPartitionName: String, + typeInference: Boolean, + basePaths: Set[Path]): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optBasePaths) = paths.map { path => parsePartition(path, defaultPartitionName, typeInference, basePaths) From d784a52ee80a9f90c3d6a3698c5b97945ca14253 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 12 Nov 2015 17:34:18 -0800 Subject: [PATCH 6/7] Comments and docs. --- .../datasources/PartitioningUtils.scala | 32 +++++++++---------- .../apache/spark/sql/sources/interfaces.scala | 10 +++--- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index ebb99096259a0..e770ee234fe1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -136,7 +136,7 @@ private[sql] object PartitioningUtils { /** * Parses a single partition, returns column names and values of each partition column, also - * the base path. For example, given: + * the path when we stop partition discovery. For example, given: * {{{ * path = hdfs://:/path/to/partition/a=42/b=hello/c=3.14 * }}} @@ -149,7 +149,7 @@ private[sql] object PartitioningUtils { * Literal.create("hello", StringType), * Literal.create(3.14, FloatType))) * }}} - * and the base path: + * and the path when we stop the discovery is: * {{{ * /path/to/partition * }}} @@ -158,13 +158,13 @@ private[sql] object PartitioningUtils { path: Path, defaultPartitionName: String, typeInference: Boolean, - rootPaths: Set[Path]): (Option[PartitionValues], Option[Path]) = { + basePaths: Set[Path]): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null // currentPath is the current path that we will use to parse partition column value. var currentPath = path - // base path will be the child of currentPath in the loop below. + // childPath will be the child of currentPath in the loop below. var childPath = path while (!finished) { @@ -174,32 +174,32 @@ private[sql] object PartitioningUtils { return (None, None) } - if (rootPaths.contains(currentPath)) { + if (basePaths.contains(currentPath)) { + // If the currentPath is one of base paths. We should stop. finished = true } else { - // Let's say currentPath is a path of /table/a=1/, currentPath.getName will give us a=1. + // Let's say currentPath is a path of "/table/a=1/", currentPath.getName will give us a=1. // Once we get the string, we try to parse it and find the partition column and value. - val maybeColumn = parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference) + val maybeColumn = + parsePartitionColumn(currentPath.getName, defaultPartitionName, typeInference) maybeColumn.foreach(columns += _) - // Now, we determine if we should continue. - // When we hit any of the following three cases, we will not continue: + // Now, we determine if we should stop. + // When we hit any of the following cases, we will stop: // - In this iteration, we could not parse the value of partition column and value, // i.e. maybeColumn is None, and columns is not empty. At here we check if columns is // empty to handle cases like /table/a=1/_temporary/something (we need to find a=1 in // this case). - // - After we get the new currentPath, this new currentPath represent the path of "/table", - // i.e. currentPath.getParent == null. - // - The currentPath we used to parse partition column and value (right now, - // it is childPath), is already the root path of a table. For the example of /table/a=1/, - // /table/ is the root path. + // - After we get the new currentPath, this new currentPath represent the top level dir + // i.e. currentPath.getParent == null. For the example of "/table/a=1/", + // the top level dir is "/table". finished = (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null if (!finished) { - // Now, childPath will be /table/a=1/ + // Now, for the above example, childPath will be "/table/a=1/". childPath = currentPath - // currentPath will be /table/ + // For the above example, currentPath will be "/table/". currentPath = currentPath.getParent } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index cd1565f87abe2..9bab1bd74d8e9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -538,11 +538,11 @@ abstract class HadoopFsRelation private[sql]( * The partitioning discovery logic will make sure it will stop when it reaches any * base path. By default, the paths of the dataset provided by users will be base paths. * For example, if a user uses `sqlContext.read.parquet("/path/something=true/")`, the base path - * will be `/path/something=true/`. The returned DataFrame will not contain a column of - * `something`. Users do not a option to override the basePath. They can use `basePath` to path - * the new base path to the data source. For the above example, if the user-provided base path - * is `/path/` instead of `/path/something=true`, the returned DataFrame will have the column - * of `something`. + * will be `/path/something=true/`, and the returned DataFrame will not contain a column of + * `something`. If users want to override the basePath. They can set `basePath` in the options + * to pass the new base path to the data source. + * For the above example, if the user-provided base path is `/path/`, the returned + * DataFrame will have the column of `something`. */ private def basePaths: Set[Path] = { val userDefinedBasePath = parameters.get("basePath").map(basePath => Set(new Path(basePath))) From 240bcf3c226d0e71601d854aaf34b136caf7f547 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Thu, 12 Nov 2015 21:58:41 -0800 Subject: [PATCH 7/7] Comments. --- .../datasources/PartitioningUtils.scala | 16 ++++++---------- .../apache/spark/sql/sources/interfaces.scala | 4 ++++ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index e770ee234fe1a..81962f8d63789 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -78,7 +78,7 @@ private[sql] object PartitioningUtils { typeInference: Boolean, basePaths: Set[Path]): PartitionSpec = { // First, we need to parse every partition's path and see if we can find partition values. - val (partitionValues, optBasePaths) = paths.map { path => + val (partitionValues, optDiscoveredBasePaths) = paths.map { path => parsePartition(path, defaultPartitionName, typeInference, basePaths) }.unzip @@ -102,11 +102,11 @@ private[sql] object PartitioningUtils { // It will be recognised as conflicting directory structure: // "hdfs://host:9000/invalidPath" // "hdfs://host:9000/path" - val basePaths = optBasePaths.flatMap(x => x) + val disvoeredBasePaths = optDiscoveredBasePaths.flatMap(x => x) assert( - basePaths.distinct.size == 1, + disvoeredBasePaths.distinct.size == 1, "Conflicting directory structures detected. Suspicious paths:\b" + - basePaths.distinct.mkString("\n\t", "\n\t", "\n\n") + + disvoeredBasePaths.distinct.mkString("\n\t", "\n\t", "\n\n") + "If provided paths are partition directories, please set " + "\"basePath\" in the options of the data source to specify the " + "root directory of the table. If there are multiple root directories, " + @@ -151,7 +151,7 @@ private[sql] object PartitioningUtils { * }}} * and the path when we stop the discovery is: * {{{ - * /path/to/partition + * hdfs://:/path/to/partition * }}} */ private[sql] def parsePartition( @@ -163,9 +163,7 @@ private[sql] object PartitioningUtils { // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null // currentPath is the current path that we will use to parse partition column value. - var currentPath = path - // childPath will be the child of currentPath in the loop below. - var childPath = path + var currentPath: Path = path while (!finished) { // Sometimes (e.g., when speculative task is enabled), temporary directories may be left @@ -197,8 +195,6 @@ private[sql] object PartitioningUtils { (maybeColumn.isEmpty && !columns.isEmpty) || currentPath.getParent == null if (!finished) { - // Now, for the above example, childPath will be "/table/a=1/". - childPath = currentPath // For the above example, currentPath will be "/table/". currentPath = currentPath.getParent } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 9bab1bd74d8e9..c26f4b3803765 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -550,6 +550,10 @@ abstract class HadoopFsRelation private[sql]( // If the user does not provide basePath, we will just use paths. val pathSet = paths.toSet pathSet.map(p => new Path(p)) + }.map { hdfsPath => + // Make the path qualified (consistent with listLeafFiles and listLeafFilesInParallel). + val fs = hdfsPath.getFileSystem(hadoopConf) + hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) } }