From 317bd1b63cc38e82bc1b09c53a68d4dd202aa4e8 Mon Sep 17 00:00:00 2001 From: Andrey Taptunov Date: Thu, 13 Jul 2017 12:50:42 +0200 Subject: [PATCH] [SPARK-21374][CORE] Fix reading globbed paths from S3 into DF with disabled FS cache --- .../org/apache/spark/deploy/SparkHadoopUtil.scala | 12 ++++++++++-- .../spark/sql/execution/datasources/DataSource.scala | 4 ++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6afe58bff5229..f113c04b90d7f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -226,14 +226,22 @@ class SparkHadoopUtil extends Logging { } def globPath(pattern: Path): Seq[Path] = { - val fs = pattern.getFileSystem(conf) + globPath(pattern, conf) + } + + def globPath(pattern: Path, hadoopConf: Configuration): Seq[Path] = { + val fs = pattern.getFileSystem(hadoopConf) Option(fs.globStatus(pattern)).map { statuses => statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq }.getOrElse(Seq.empty[Path]) } def globPathIfNecessary(pattern: Path): Seq[Path] = { - if (isGlobPath(pattern)) globPath(pattern) else Seq(pattern) + globPathIfNecessary(pattern, conf) + } + + def globPathIfNecessary(pattern: Path, hadoopConf: Configuration): Seq[Path] = { + if (isGlobPath(pattern)) globPath(pattern, hadoopConf) else Seq(pattern) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index d36a04f1fff8e..1da8f371f87a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -132,7 +132,7 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - SparkHadoopUtil.get.globPathIfNecessary(qualified) + SparkHadoopUtil.get.globPathIfNecessary(qualified, hadoopConf) }.toArray new InMemoryFileIndex(sparkSession, globbedPaths, options, None, fileStatusCache) } @@ -364,7 +364,7 @@ case class DataSource( val hdfsPath = new Path(path) val fs = hdfsPath.getFileSystem(hadoopConf) val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory) - val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified) + val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified, hadoopConf) if (globPath.isEmpty) { throw new AnalysisException(s"Path does not exist: $qualified")