From 746a97ebd7c2f252c499357ac04177e0a98b1c31 Mon Sep 17 00:00:00 2001 From: wuyi Date: Mon, 21 Oct 2019 20:58:48 +0800 Subject: [PATCH 1/8] throw exception when user defined a wrong base path --- .../PartitioningAwareFileIndex.scala | 6 ++++++ .../execution/datasources/FileIndexSuite.scala | 18 ++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 3adec2f79073..975fbc592155 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -221,6 +221,12 @@ abstract class PartitioningAwareFileIndex( if (!fs.isDirectory(userDefinedBasePath)) { throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") } + rootPaths.find(!_.toString.startsWith(userDefinedBasePath.toString)) match { + case Some(rp) => throw new IllegalArgumentException( + s"Wrong basePath $userDefinedBasePath for the root path: $rp") + + case None => // valid basePath + } Set(fs.makeQualified(userDefinedBasePath)) case None => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index a7a2349a1dfb..19f39c2352da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -352,6 +352,24 @@ class FileIndexSuite extends SharedSparkSession { "driver side must not be negative")) } + test ("SPARK-29537: throw exception when user defined a wrong base path") { + withTempDir { dir => + val partitionDirectory = new File(dir, "a=foo") + partitionDirectory.mkdir() + val file = new File(partitionDirectory, "text.txt") + stringToFile(file, "text") + val path = new Path(dir.getCanonicalPath) + val wrongBasePath = new File(dir, "unknown") + wrongBasePath.mkdir() + val parameters = Map("basePath" -> wrongBasePath.getCanonicalPath) + val fileIndex = new InMemoryFileIndex(spark, Seq(path), parameters, None) + intercept[IllegalArgumentException] { + // trigger inferPartitioning() + fileIndex.partitionSpec() + } + } + } + test("refresh for InMemoryFileIndex with FileStatusCache") { withTempDir { dir => val fileStatusCache = FileStatusCache.getOrCreate(spark) From f7ecb159fdc1bf6c05e507585a0f98321e31d385 Mon Sep 17 00:00:00 2001 From: wuyi Date: Tue, 22 Oct 2019 00:09:33 +0800 Subject: [PATCH 2/8] use Uri.Path --- .../sql/execution/datasources/PartitioningAwareFileIndex.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 975fbc592155..8581e436b28f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -221,7 +221,7 @@ abstract class PartitioningAwareFileIndex( if (!fs.isDirectory(userDefinedBasePath)) { throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") } - rootPaths.find(!_.toString.startsWith(userDefinedBasePath.toString)) match { + rootPaths.find(!_.toUri.getPath.startsWith(userDefinedBasePath.toUri.getPath)) match { case Some(rp) => throw new IllegalArgumentException( s"Wrong basePath $userDefinedBasePath for the root path: $rp") From 358bc0e55e83cfcd4885a6031ded0fc0bffb4109 Mon Sep 17 00:00:00 2001 From: wuyi Date: Wed, 27 Nov 2019 16:49:09 +0800 Subject: [PATCH 3/8] use qualified path --- .../execution/datasources/PartitioningAwareFileIndex.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 8581e436b28f..ec960cc439b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -221,7 +221,11 @@ abstract class PartitioningAwareFileIndex( if (!fs.isDirectory(userDefinedBasePath)) { throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") } - rootPaths.find(!_.toUri.getPath.startsWith(userDefinedBasePath.toUri.getPath)) match { + def qualifiedPath(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + + val qualifiedBasePath = qualifiedPath(userDefinedBasePath) + rootPaths.find(p => !qualifiedPath(p).toString. + startsWith(qualifiedBasePath.toString)) match { case Some(rp) => throw new IllegalArgumentException( s"Wrong basePath $userDefinedBasePath for the root path: $rp") From 617ab8ffe08083f7763d8ee1d1577d2cef134dad Mon Sep 17 00:00:00 2001 From: wuyi Date: Wed, 27 Nov 2019 16:49:26 +0800 Subject: [PATCH 4/8] improve test --- .../spark/sql/execution/datasources/FileIndexSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 19f39c2352da..654bab85b950 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -363,10 +363,11 @@ class FileIndexSuite extends SharedSparkSession { wrongBasePath.mkdir() val parameters = Map("basePath" -> wrongBasePath.getCanonicalPath) val fileIndex = new InMemoryFileIndex(spark, Seq(path), parameters, None) - intercept[IllegalArgumentException] { + val msg = intercept[IllegalArgumentException] { // trigger inferPartitioning() fileIndex.partitionSpec() - } + }.getMessage + assert(msg === s"Wrong basePath ${wrongBasePath.getCanonicalPath} for the root path: $path") } } From 261b9adfe00ebe2def6078523adf9361e69abf1e Mon Sep 17 00:00:00 2001 From: wuyi Date: Thu, 28 Nov 2019 11:28:53 +0800 Subject: [PATCH 5/8] add test in DataFrameReaderWriterSuite --- .../execution/datasources/FileIndexSuite.scala | 1 + .../sql/test/DataFrameReaderWriterSuite.scala | 15 +++++++++++++++ 2 files changed, 16 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala index 654bab85b950..553773e2555c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileIndexSuite.scala @@ -360,6 +360,7 @@ class FileIndexSuite extends SharedSparkSession { stringToFile(file, "text") val path = new Path(dir.getCanonicalPath) val wrongBasePath = new File(dir, "unknown") + // basePath must be a directory wrongBasePath.mkdir() val parameters = Map("basePath" -> wrongBasePath.getCanonicalPath) val fileIndex = new InMemoryFileIndex(spark, Seq(path), parameters, None) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index cef0e5ab4756..55a60940a775 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -234,6 +234,21 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with assert(DataSourceUtils.decodePartitioningColumns(partColumns) === Seq("col1", "col2")) } + test ("SPARK-29537: throw exception when user defined a wrong base path") { + withTempPath { p => + val path = new Path(p.toURI).toString + Seq((1, 1), (2, 2)).toDF("c1", "c2") + .write.partitionBy("c1").mode(SaveMode.Overwrite).parquet(path) + val wrongBasePath = new File(p, "unknown") + // basePath must be a directory + wrongBasePath.mkdir() + val msg = intercept[IllegalArgumentException] { + spark.read.option("basePath", wrongBasePath.getCanonicalPath).parquet(path) + }.getMessage + assert(msg === s"Wrong basePath ${wrongBasePath.getCanonicalPath} for the root path: $path") + } + } + test("save mode") { spark.range(10).write .format("org.apache.spark.sql.test") From 66f0bd36cdf64cfac11ed7199badfa820e7f3d38 Mon Sep 17 00:00:00 2001 From: wuyi Date: Thu, 28 Nov 2019 11:35:23 +0800 Subject: [PATCH 6/8] address comment --- .../datasources/PartitioningAwareFileIndex.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index ec960cc439b0..b7637a28a3d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -224,13 +224,12 @@ abstract class PartitioningAwareFileIndex( def qualifiedPath(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory) val qualifiedBasePath = qualifiedPath(userDefinedBasePath) - rootPaths.find(p => !qualifiedPath(p).toString. - startsWith(qualifiedBasePath.toString)) match { - case Some(rp) => throw new IllegalArgumentException( - s"Wrong basePath $userDefinedBasePath for the root path: $rp") - - case None => // valid basePath - } + rootPaths + .find(p => !qualifiedPath(p).toString.startsWith(qualifiedBasePath.toString)) + .foreach { rp => + throw new IllegalArgumentException( + s"Wrong basePath $userDefinedBasePath for the root path: $rp") + } Set(fs.makeQualified(userDefinedBasePath)) case None => From e270fea3f9875614b81950c9630970cd4bc8aee4 Mon Sep 17 00:00:00 2001 From: wuyi Date: Mon, 2 Dec 2019 20:39:47 +0800 Subject: [PATCH 7/8] address comments --- .../execution/datasources/PartitioningAwareFileIndex.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index b7637a28a3d3..68b499516975 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -221,16 +221,16 @@ abstract class PartitioningAwareFileIndex( if (!fs.isDirectory(userDefinedBasePath)) { throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") } - def qualifiedPath(path: Path): Path = path.makeQualified(fs.getUri, fs.getWorkingDirectory) + def qualifiedPath(path: Path): String = fs.makeQualified(path).toString val qualifiedBasePath = qualifiedPath(userDefinedBasePath) rootPaths - .find(p => !qualifiedPath(p).toString.startsWith(qualifiedBasePath.toString)) + .find(p => !qualifiedPath(p).startsWith(qualifiedBasePath)) .foreach { rp => throw new IllegalArgumentException( s"Wrong basePath $userDefinedBasePath for the root path: $rp") } - Set(fs.makeQualified(userDefinedBasePath)) + Set(new Path(qualifiedBasePath)) case None => rootPaths.map { path => From e889cda5b1ef4eb28dbdc276926a2318be3df531 Mon Sep 17 00:00:00 2001 From: wuyi Date: Mon, 2 Dec 2019 21:30:30 +0800 Subject: [PATCH 8/8] address comment --- .../datasources/PartitioningAwareFileIndex.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala index 68b499516975..7bfe42931f4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala @@ -221,16 +221,15 @@ abstract class PartitioningAwareFileIndex( if (!fs.isDirectory(userDefinedBasePath)) { throw new IllegalArgumentException(s"Option '$BASE_PATH_PARAM' must be a directory") } - def qualifiedPath(path: Path): String = fs.makeQualified(path).toString - - val qualifiedBasePath = qualifiedPath(userDefinedBasePath) + val qualifiedBasePath = fs.makeQualified(userDefinedBasePath) + val qualifiedBasePathStr = qualifiedBasePath.toString rootPaths - .find(p => !qualifiedPath(p).startsWith(qualifiedBasePath)) + .find(!fs.makeQualified(_).toString.startsWith(qualifiedBasePathStr)) .foreach { rp => throw new IllegalArgumentException( s"Wrong basePath $userDefinedBasePath for the root path: $rp") } - Set(new Path(qualifiedBasePath)) + Set(qualifiedBasePath) case None => rootPaths.map { path =>