Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 2 additions & 14 deletions docs/sql-data-sources-binaryFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,9 @@ It produces a DataFrame with the following columns and possibly partition column
* `length`: LongType
* `content`: BinaryType

It supports the following read option:
<table class="table">
<tr><th><b>Property Name</b></th><th><b>Default</b></th><th><b>Meaning</b></th></tr>
<tr>
<td><code>pathGlobFilter</code></td>
<td>none (accepts all)</td>
<td>
An optional glob pattern to only include files with paths matching the pattern.
The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
It does not change the behavior of partition discovery.
</td>
</tr>
</table>

To read whole binary files, you need to specify the data source `format` as `binaryFile`.
To load files with paths matching a given glob pattern while keeping the behavior of partition discovery,
you can use the general data source option `pathGlobFilter`.
For example, the following code reads all PNG files from the input directory:

<div class="codetabs">
Expand Down
21 changes: 21 additions & 0 deletions docs/sql-data-sources-load-save-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,27 @@ To load a CSV file you can use:
</div>
</div>

To load files with paths matching a given glob pattern while keeping the behavior of partition discovery,
you can use:

<div class="codetabs">
<div data-lang="scala" markdown="1">
{% include_example load_with_path_glob_filter scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
</div>

<div data-lang="java" markdown="1">
{% include_example load_with_path_glob_filter java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
</div>

<div data-lang="python" markdown="1">
{% include_example load_with_path_glob_filter python/sql/datasource.py %}
</div>

<div data-lang="r" markdown="1">
{% include_example load_with_path_glob_filter r/RSparkSQLExample.R %}
</div>
</div>

The extra options are also used during write operation.
For example, you can control bloom filters and dictionary encodings for ORC data sources.
The following ORC example will create bloom filter and use dictionary encoding only for `favorite_color`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ private static void runBasicDataSourceExample(SparkSession spark) {
.option("header", "true")
.load("examples/src/main/resources/people.csv");
// $example off:manual_load_options_csv$
// $example on:load_with_path_glob_filter$
Dataset<Row> partitionedUsersDF = spark.read().format("orc")
.option("pathGlobFilter", "*.orc")
.load("examples/src/main/resources/partitioned_users.orc");
// $example off:load_with_path_glob_filter$
// $example on:manual_save_options_orc$
usersDF.write().format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
Expand Down
5 changes: 5 additions & 0 deletions examples/src/main/python/sql/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ def basic_datasource_example(spark):
format="csv", sep=":", inferSchema="true", header="true")
# $example off:manual_load_options_csv$

# $example on:load_with_path_glob_filter$
df = spark.read.load("examples/src/main/resources/partitioned_users.orc",
format="orc", pathGlobFilter="*.orc")
# $example off:load_with_path_glob_filter$

# $example on:manual_save_options_orc$
df = spark.read.orc("examples/src/main/resources/users.orc")
(df.write.format("orc")
Expand Down
4 changes: 4 additions & 0 deletions examples/src/main/r/RSparkSQLExample.R
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,10 @@ df <- read.df("examples/src/main/resources/people.csv", "csv", sep = ";", inferS
namesAndAges <- select(df, "name", "age")
# $example off:manual_load_options_csv$

# $example on:load_with_path_glob_filter$
df <- read.df("examples/src/main/resources/partitioned_users.orc", "orc", pathGlobFilter = "*.orc")
# $example off:load_with_path_glob_filter$

# $example on:manual_save_options_orc$
df <- read.df("examples/src/main/resources/users.orc", "orc")
write.orc(df, "users_with_options.orc", orc.bloom.filter.columns = "favorite_color", orc.dictionary.key.threshold = 1.0, orc.column.encoding.direct = "name")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
do not read this
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ object SQLDataSourceExample {
.option("header", "true")
.load("examples/src/main/resources/people.csv")
// $example off:manual_load_options_csv$
// $example on:load_with_path_glob_filter$
val partitionedUsersDF = spark.read.format("orc")
.option("pathGlobFilter", "*.orc")
.load("examples/src/main/resources/partitioned_users.orc")
// $example off:load_with_path_glob_filter$
// $example on:manual_save_options_orc$
usersDF.write.format("orc")
.option("orc.bloom.filter.columns", "favorite_color")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ private[avro] class AvroFileFormat extends FileFormat
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
val conf = spark.sessionState.newHadoopConf()
if (options.contains("ignoreExtension")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"ignoreExtension " -> AvroOptions.ignoreExtensionKey

logWarning(s"Option ${AvroOptions.ignoreExtensionKey} is deprecated. Please use the " +
"general data source option pathGlobFilter for filtering file names.")
}
val parsedOptions = new AvroOptions(options, conf)

// User can specify an optional avro json schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,15 @@ class AvroOptions(
* If the option is not set, the Hadoop's config `avro.mapred.ignore.inputs.without.extension`
* is taken into account. If the former one is not set too, file extensions are ignored.
*/
@deprecated("Use the general data source option pathGlobFilter for filtering file names", "3.0")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am wondering whom is this deprecation warning to? Spark users don't use ignoreExtension directly. I do think we should print a warning when we read & detect that AvroFileFormat.IgnoreFilesWithoutExtensionProperty and/or AvroOptions.ignoreExtensionKey are set otherwise users will never see the deprecation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

only in one case at schema inferring. I would remove this annotation and print warning in initialization of AvroOptions. The deprecation warning is printed only while Spark compilation which is useless for users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should remove deprecated.

It would be great if we can put that logic into AvroOptions e.g.:

    parameters
      .get(AvroOptions.ignoreExtensionKey)
      .map { v =>
        logWarning(...)
        v.toBoolean
      }.getOrElse(!ignoreFilesWithoutExtension)

However, can you make it doesn't show the logs too many times? If we put there, seems like it will show the same logs multiple times.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can find a better way, please go and open a PR (and some nits I picked below)

val ignoreExtension: Boolean = {
val ignoreFilesWithoutExtensionByDefault = false
val ignoreFilesWithoutExtension = conf.getBoolean(
AvroFileFormat.IgnoreFilesWithoutExtensionProperty,
ignoreFilesWithoutExtensionByDefault)

parameters
.get("ignoreExtension")
.get(AvroOptions.ignoreExtensionKey)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ignoreExtensionKey -> IGNORE_EXTENTION_KEY to be consistent with other XXXOptions

.map(_.toBoolean)
.getOrElse(!ignoreFilesWithoutExtension)
}
Expand All @@ -93,4 +94,6 @@ object AvroOptions {
.getOrElse(new Configuration())
new AvroOptions(CaseInsensitiveMap(parameters), hadoopConf)
}

val ignoreExtensionKey = "ignoreExtension"
}
6 changes: 6 additions & 0 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, actually can we move this documentation to each implementation of CSV, Parquet, ORC, text? It will only work with such internal file based sources.

the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
"""
self._jreader = self._jreader.option(key, to_str(value))
return self
Expand All @@ -132,6 +135,9 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.
"""
for k in options:
self._jreader = self._jreader.option(k, to_str(options[k]))
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,9 @@ def option(self, key, value):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.

Expand All @@ -357,6 +360,9 @@ def options(self, **options):
* ``timeZone``: sets the string that indicates a timezone to be used to parse timestamps
in the JSON/CSV datasources or partition values.
If it isn't set, it uses the default value, session local timezone.
* ``pathGlobFilter``: an optional glob pattern to only include files with paths matching
the pattern. The syntax follows org.apache.hadoop.fs.GlobFilter.
It does not change the behavior of partition discovery.

.. note:: Evolving.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand Down Expand Up @@ -135,6 +138,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand All @@ -151,6 +157,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <ul>
* <li>`timeZone` (default session local timezone): sets the string that indicates a timezone
* to be used to parse timestamps in the JSON/CSV datasources or partition values.</li>
* <li>`pathGlobFilter`: an optional glob pattern to only include files with paths matching
* the pattern. The syntax follows <code>org.apache.hadoop.fs.GlobFilter</code>.
* It does not change the behavior of partition discovery.</li>
* </ul>
*
* @since 1.4.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ case class DataSource(
sparkSession.sessionState.newHadoopConf(),
sparkSession.sessionState.conf) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath,
caseInsensitiveOptions, userSpecifiedSchema)
val dataSchema = userSpecifiedSchema.orElse {
format.inferSchema(
sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ abstract class PartitioningAwareFileIndex(

protected def leafDirToChildrenFiles: Map[Path, Array[FileStatus]]

protected lazy val pathGlobFilter = parameters.get("pathGlobFilter").map(new GlobFilter(_))

protected def matchGlobPattern(file: FileStatus): Boolean = {
pathGlobFilter.forall(_.accept(file.getPath))
}

override def listFiles(
partitionFilters: Seq[Expression], dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
def isNonEmptyFile(f: FileStatus): Boolean = {
Expand All @@ -69,7 +75,7 @@ abstract class PartitioningAwareFileIndex(
val files: Seq[FileStatus] = leafDirToChildrenFiles.get(path) match {
case Some(existingDir) =>
// Directory has children files in it, return them
existingDir.filter(isNonEmptyFile)
existingDir.filter(f => matchGlobPattern(f) && isNonEmptyFile(f))

case None =>
// Directory does not exist, or has no children files
Expand All @@ -89,7 +95,7 @@ abstract class PartitioningAwareFileIndex(
override def sizeInBytes: Long = allFiles().map(_.getLen).sum

def allFiles(): Seq[FileStatus] = {
if (partitionSpec().partitionColumns.isEmpty) {
val files = if (partitionSpec().partitionColumns.isEmpty) {
// For each of the root input paths, get the list of files inside them
rootPaths.flatMap { path =>
// Make the path qualified (consistent with listLeafFiles and bulkListLeafFiles).
Expand Down Expand Up @@ -118,6 +124,7 @@ abstract class PartitioningAwareFileIndex(
} else {
leafFiles.values.toSeq
}
files.filter(matchGlobPattern)
}

protected def inferPartitioning(): PartitionSpec = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,10 @@ import org.apache.spark.util.SerializableConfiguration
* {{{
* // Scala
* val df = spark.read.format("binaryFile")
* .option("pathGlobFilter", "*.png")
* .load("/path/to/fileDir")
*
* // Java
* Dataset<Row> df = spark.read().format("binaryFile")
* .option("pathGlobFilter", "*.png")
* .load("/path/to/fileDir");
* }}}
*/
Expand Down Expand Up @@ -98,44 +96,37 @@ class BinaryFileFormat extends FileFormat with DataSourceRegister {

val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
val binaryFileSourceOptions = new BinaryFileSourceOptions(options)
val pathGlobPattern = binaryFileSourceOptions.pathGlobFilter
val filterFuncs = filters.map(filter => createFilterFunction(filter))
val maxLength = sparkSession.conf.get(SOURCES_BINARY_FILE_MAX_LENGTH)

file: PartitionedFile => {
val path = new Path(file.filePath)
// TODO: Improve performance here: each file will recompile the glob pattern here.
if (pathGlobPattern.forall(new GlobFilter(_).accept(path))) {
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
if (filterFuncs.forall(_.apply(status))) {
val writer = new UnsafeRowWriter(requiredSchema.length)
writer.resetRowWriter()
requiredSchema.fieldNames.zipWithIndex.foreach {
case (PATH, i) => writer.write(i, UTF8String.fromString(status.getPath.toString))
case (LENGTH, i) => writer.write(i, status.getLen)
case (MODIFICATION_TIME, i) =>
writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime))
case (CONTENT, i) =>
if (status.getLen > maxLength) {
throw new SparkException(
s"The length of ${status.getPath} is ${status.getLen}, " +
s"which exceeds the max length allowed: ${maxLength}.")
}
val stream = fs.open(status.getPath)
try {
writer.write(i, ByteStreams.toByteArray(stream))
} finally {
Closeables.close(stream, true)
}
case (other, _) =>
throw new RuntimeException(s"Unsupported field name: ${other}")
}
Iterator.single(writer.getRow)
} else {
Iterator.empty
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)
if (filterFuncs.forall(_.apply(status))) {
val writer = new UnsafeRowWriter(requiredSchema.length)
writer.resetRowWriter()
requiredSchema.fieldNames.zipWithIndex.foreach {
case (PATH, i) => writer.write(i, UTF8String.fromString(status.getPath.toString))
case (LENGTH, i) => writer.write(i, status.getLen)
case (MODIFICATION_TIME, i) =>
writer.write(i, DateTimeUtils.fromMillis(status.getModificationTime))
case (CONTENT, i) =>
if (status.getLen > maxLength) {
throw new SparkException(
s"The length of ${status.getPath} is ${status.getLen}, " +
s"which exceeds the max length allowed: ${maxLength}.")
}
val stream = fs.open(status.getPath)
try {
writer.write(i, ByteStreams.toByteArray(stream))
} finally {
Closeables.close(stream, true)
}
case (other, _) =>
throw new RuntimeException(s"Unsupported field name: ${other}")
}
Iterator.single(writer.getRow)
} else {
Iterator.empty
}
Expand Down Expand Up @@ -204,14 +195,3 @@ object BinaryFileFormat {
}
}

class BinaryFileSourceOptions(
@transient private val parameters: CaseInsensitiveMap[String]) extends Serializable {

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

/**
* An optional glob pattern to only include files with paths matching the pattern.
* The syntax follows [[org.apache.hadoop.fs.GlobFilter]].
*/
val pathGlobFilter: Option[String] = parameters.get("pathGlobFilter")
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.datasources.{DataSource, InMemoryFileIndex, LogicalRelation}
import org.apache.spark.sql.types.StructType

Expand Down Expand Up @@ -195,7 +196,8 @@ class FileStreamSource(
private def allFilesUsingMetadataLogFileIndex() = {
// Note if `sourceHasMetadata` holds, then `qualifiedBasePath` is guaranteed to be a
// non-glob path
new MetadataLogFileIndex(sparkSession, qualifiedBasePath, None).allFiles()
new MetadataLogFileIndex(sparkSession, qualifiedBasePath,
CaseInsensitiveMap(options), None).allFiles()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ import org.apache.spark.sql.types.StructType
class MetadataLogFileIndex(
sparkSession: SparkSession,
path: Path,
parameters: Map[String, String],
userSpecifiedSchema: Option[StructType])
extends PartitioningAwareFileIndex(sparkSession, Map.empty, userSpecifiedSchema) {
extends PartitioningAwareFileIndex(sparkSession, parameters, userSpecifiedSchema) {

private val metadataDirectory = {
val metadataDir = new Path(path, FileStreamSink.metadataDir)
Expand Down
Loading