Skip to content

Commit ad4d4de

Browse files
committed
Enables HDFS style globbing
1 parent 8d12e69 commit ad4d4de

File tree

5 files changed

+80
-7
lines changed

5 files changed

+80
-7
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -231,6 +231,13 @@ class SparkHadoopUtil extends Logging {
231231
recurse(baseStatus)
232232
}
233233

234+
def globPath(pattern: Path): Seq[Path] = {
235+
val fs = pattern.getFileSystem(conf)
236+
Option(fs.globStatus(pattern)).map { statuses =>
237+
statuses.map(_.getPath.makeQualified(fs.getUri, fs.getWorkingDirectory)).toSeq
238+
}.getOrElse(Seq.empty[Path])
239+
}
240+
234241
/**
235242
* Lists all the files in a directory with the specified prefix, and does not end with the
236243
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of

sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@ package org.apache.spark.sql.sources
2020
import scala.language.{existentials, implicitConversions}
2121
import scala.util.matching.Regex
2222

23+
import org.apache.hadoop.fs.Path
24+
2325
import org.apache.spark.Logging
26+
import org.apache.spark.deploy.SparkHadoopUtil
2427
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
2528
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
2629
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Row}
@@ -157,7 +160,7 @@ private[sql] class DDLParser(
157160
protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
158161

159162
override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
160-
s"identifier matching regex ${regex}", {
163+
s"identifier matching regex $regex", {
161164
case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
162165
case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
163166
}
@@ -230,11 +233,18 @@ private[sql] object ResolvedDataSource {
230233
Some(partitionColumnsSchema(schema, partitionColumns))
231234
}
232235

236+
val caseInsensitiveOptions= new CaseInsensitiveMap(options)
237+
val paths = {
238+
val patternPath = new Path(caseInsensitiveOptions("path"))
239+
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
240+
}
241+
233242
dataSource.createRelation(
234243
sqlContext,
244+
paths,
235245
Some(schema),
236246
maybePartitionsSchema,
237-
new CaseInsensitiveMap(options))
247+
caseInsensitiveOptions)
238248
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
239249
throw new AnalysisException(s"$className does not allow user-specified schemas.")
240250
case _ =>
@@ -245,7 +255,12 @@ private[sql] object ResolvedDataSource {
245255
case dataSource: RelationProvider =>
246256
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
247257
case dataSource: FSBasedRelationProvider =>
248-
dataSource.createRelation(sqlContext, None, None, new CaseInsensitiveMap(options))
258+
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
259+
val paths = {
260+
val patternPath = new Path(caseInsensitiveOptions("path"))
261+
SparkHadoopUtil.get.globPath(patternPath).map(_.toString).toArray
262+
}
263+
dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
249264
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
250265
throw new AnalysisException(
251266
s"A schema needs to be specified when using $className.")
@@ -280,11 +295,22 @@ private[sql] object ResolvedDataSource {
280295
case dataSource: CreatableRelationProvider =>
281296
dataSource.createRelation(sqlContext, mode, options, data)
282297
case dataSource: FSBasedRelationProvider =>
298+
// Don't glob path for the write path. The contracts here are:
299+
// 1. Only one output path can be specified on the write path;
300+
// 2. Output path must be a legal HDFS style file system path;
301+
// 3. It's OK that the output path doesn't exist yet;
302+
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
303+
val outputPath = {
304+
val path = new Path(caseInsensitiveOptions("path"))
305+
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
306+
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
307+
}
283308
val r = dataSource.createRelation(
284309
sqlContext,
310+
Array(outputPath.toString),
285311
Some(data.schema),
286312
Some(partitionColumnsSchema(data.schema, partitionColumns)),
287-
options)
313+
caseInsensitiveOptions)
288314
sqlContext.executePlan(
289315
InsertIntoFSBasedRelation(
290316
r,

sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ trait FSBasedRelationProvider {
114114
*/
115115
def createRelation(
116116
sqlContext: SQLContext,
117+
paths: Array[String],
117118
schema: Option[StructType],
118119
partitionColumns: Option[StructType],
119120
parameters: Map[String, String]): FSBasedRelation

sql/hive/src/test/scala/org/apache/spark/sql/sources/FSBasedRelationSuite.scala

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -483,4 +483,43 @@ class FSBasedRelationSuite extends QueryTest with ParquetTest {
483483
assert(table("t").collect().isEmpty)
484484
}
485485
}
486+
487+
test("Hadoop style globbing") {
488+
withTempPath { file =>
489+
partitionedTestDF.save(
490+
source = classOf[SimpleTextSource].getCanonicalName,
491+
mode = SaveMode.Overwrite,
492+
options = Map("path" -> file.getCanonicalPath),
493+
partitionColumns = Seq("p1", "p2"))
494+
495+
val df = load(
496+
source = classOf[SimpleTextSource].getCanonicalName,
497+
options = Map(
498+
"path" -> s"${file.getCanonicalPath}/p1=*/p2=???",
499+
"dataSchema" -> dataSchema.json))
500+
501+
val expectedPaths = Set(
502+
s"${file.getCanonicalFile}/p1=1/p2=foo",
503+
s"${file.getCanonicalFile}/p1=2/p2=foo",
504+
s"${file.getCanonicalFile}/p1=1/p2=bar",
505+
s"${file.getCanonicalFile}/p1=2/p2=bar"
506+
).map { p =>
507+
val path = new Path(p)
508+
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
509+
path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString
510+
}
511+
512+
println(df.queryExecution)
513+
514+
val actualPaths = df.queryExecution.analyzed.collectFirst {
515+
case LogicalRelation(relation: FSBasedRelation) =>
516+
relation.paths.toSet
517+
}.getOrElse {
518+
fail("Expect an FSBasedRelation, but none could be found")
519+
}
520+
521+
assert(actualPaths === expectedPaths)
522+
checkAnswer(df, partitionedTestDF.collect())
523+
}
524+
}
486525
}

sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ import org.apache.spark.sql.{Row, SQLContext}
3737
class SimpleTextSource extends FSBasedRelationProvider {
3838
override def createRelation(
3939
sqlContext: SQLContext,
40+
paths: Array[String],
4041
schema: Option[StructType],
4142
partitionColumns: Option[StructType],
4243
parameters: Map[String, String]): FSBasedRelation = {
43-
new SimpleTextRelation(
44-
Array(parameters("path")), schema,
45-
partitionColumns.getOrElse(StructType(Array.empty[StructField])), parameters)(sqlContext)
44+
val partitionsSchema = partitionColumns.getOrElse(StructType(Array.empty[StructField]))
45+
new SimpleTextRelation(paths, schema, partitionsSchema, parameters)(sqlContext)
4646
}
4747
}
4848

0 commit comments

Comments
 (0)