-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25348][SQL] Data source for binary files #24354
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Discussion:
|
.../main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileDataSource.scala
Outdated
Show resolved
Hide resolved
| /** | ||
| * `binaryfile` package implements Spark SQL data source API for loading binary file data | ||
| * as `DataFrame`. | ||
| * |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also document how to control the input partition size. cc: @cloud-fan
.../main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileDataSource.scala
Outdated
Show resolved
Hide resolved
.../main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileDataSource.scala
Outdated
Show resolved
Hide resolved
.../main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileDataSource.scala
Outdated
Show resolved
Hide resolved
...e/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileSuite.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
.../main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileDataSource.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
|
Test build #104531 has finished for PR 24354 at commit
|
|
Test build #104533 has finished for PR 24354 at commit
|
|
Jenkins, retest this please. |
|
Test build #104534 has finished for PR 24354 at commit
|
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Show resolved
Hide resolved
|
Test build #104538 has finished for PR 24354 at commit
|
mengxr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still need some minor changes.
...e/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileSuite.scala
Outdated
Show resolved
Hide resolved
...e/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileSuite.scala
Outdated
Show resolved
Hide resolved
...e/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileSuite.scala
Outdated
Show resolved
Hide resolved
...e/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileSuite.scala
Outdated
Show resolved
Hide resolved
...e/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileSuite.scala
Outdated
Show resolved
Hide resolved
|
@cloud-fan @gatorsmile I think this PR is almost ready to merge. Could you make a pass? |
|
Test build #104556 has finished for PR 24354 at commit
|
|
Test build #104557 has finished for PR 24354 at commit
|
| sparkSession: SparkSession, | ||
| options: Map[String, String], | ||
| path: Path): Boolean = { | ||
| false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we sure about this? Always return false means one file one RDD partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think binary partitions should be splittable like binaryFiles because usually one binary is a minimal logical unit for arbitrary binary files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Does it mean that the file itself cannot be split into multiple parts? It shouldn't lead to one file per partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if isSplitable returns false, then Spark can only read the entire file with a single thread, so it's one file per partition.
The file splitting is actually very complicated. For example, the text format splits the file w.r.t. the line boundary. A line of text will not be split into multiple partitions. I'm not sure how to define the file splitting logic for binary files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't lead to one file per partition.
@mengxr, do you mean one binary file should be split into multiple parts? In that case, the splitting rule should be defined and fixed so that end users can process it and use it. If users are not aware of the rule to split up, there wouldn't be a way for users to use it (for instance image).
I thought this will be implemented by maxBytesPerPartition, and by default one file per one partition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan Why single thread leads to one file per partition? One partition can still have multiple files, but one file cannot be split into multiple records.
@HyukjinKwon We don't want to split a file into parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be clear: one file per file partition. It's still possible that one RDD partition contains many files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Oops, I was confused that we were talking about one partition that has multiple parts)
|
|
||
| // TODO: Improve performance here: each file will recompile the glob pattern here. | ||
| val globFilter = if (pathGlobPattern.isEmpty) { null } else { | ||
| new GlobFilter(pathGlobPattern) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is GlobFilter serializable? If it is then we can create it outside of the closure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not serializable so I put it inside.
|
|
||
| val requiredColumns = GenerateUnsafeProjection.generate(requiredOutput, fullOutput) | ||
|
|
||
| val row = Row(Row(path, modificationTime, length), content) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the schema is simple, we can create InternalRow directly, instead of creating Row and using RowEncoder.
string type should be UTF8String, timestamp type should be a long that is microseconds count since January 1, 1970 UTC.
| dataSchema: StructType, | ||
| partitionSchema: StructType, | ||
| requiredSchema: StructType, | ||
| filters: Seq[Filter], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we going to leverage the filters here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can put it in later PR.
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry but i have to say this. What's our plan to add new data source? Is it going to be external module like Avro or Kafka or is it decided per case? For instance, do only datasources having a rather complex dependencies go into external modules?
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Show resolved
Hide resolved
...e/src/test/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileSuite.scala
Show resolved
Hide resolved
.../src/main/scala/org/apache/spark/sql/execution/datasources/binaryfile/BinaryFileFormat.scala
Outdated
Show resolved
Hide resolved
|
@HyukjinKwon Done. Thanks! |
mengxr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final pass
| * See doc in `BinaryFileDataSource` | ||
| */ | ||
| val binaryFileSchema = StructType( | ||
| StructField("content", BinaryType, false):: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- space before
:: - just a note: we might keep this column nullable in case to handle potential I/O failures
| content, | ||
| InternalRow( | ||
| UTF8String.fromString(path), | ||
| DateTimeUtils.fromJavaTimestamp(modificationTime), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it more straightforward to use fromMillis?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should use DateTimeUtils.fromMillis(fileStatus.getModificationTime())
| * only include files with path matching the glob pattern. | ||
| */ | ||
| val pathGlobFilter: Option[String] = { | ||
| val filter = parameters.getOrElse("pathGlobFilter", null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just parameters.get("pathGlobFilter") should work
|
Test build #104614 has finished for PR 24354 at commit
|
|
Test build #104613 has finished for PR 24354 at commit
|
|
retest this please |
| requiredSchema.fieldNames.contains(a.name) | ||
| } | ||
|
|
||
| val requiredColumns = GenerateUnsafeProjection.generate(requiredOutput, fullOutput) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this does not help the performance. We still read the file content even if content column is not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is OK for now, maybe we can leave a TODO and implement the real column pruning in the future.
|
Test build #104618 has finished for PR 24354 at commit
|
| * {{{ | ||
| * // Scala | ||
| * val df = spark.read.format("binaryFile") | ||
| * .option("pathGlobFilter", "*.txt") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: how about changing the extension name "*.txt" in the example, e.g. *.png or *.jpg
| val path = file.filePath | ||
| val fsPath = new Path(path) | ||
|
|
||
| // TODO: Improve performance here: each file will recompile the glob pattern here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should make it a general option, which can be applied in all data sources. Also we should pass the option to FileIndex, so that Spark can split file partition more precisely.
We can have a follow-up PR for this.
| import org.apache.spark.util.SerializableConfiguration | ||
|
|
||
|
|
||
| private[binaryfile] class BinaryFileFormat extends FileFormat with DataSourceRegister { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per https://issues.apache.org/jira/browse/SPARK-16964, I think we can remove private[binaryfile]
| } | ||
| } | ||
|
|
||
| private[binaryfile] class BinaryFileSourceOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove private[binaryfile] here as well.
|
@WeichenXu123 I sent you a PR at WeichenXu123#6 to address @HyukjinKwon 's comment on the docs. |
|
Test build #104634 has finished for PR 24354 at commit
|
|
Thanks, @WeichenXu123 and @mengxr for bearing with me. I'm okay with this. |
|
LGTM. Merged into master. I created two follow-up tasks:
|
|
Test build #104637 has finished for PR 24354 at commit
|
| val stream = fs.open(fsPath) | ||
|
|
||
| val content = try { | ||
| ByteStreams.toByteArray(stream) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I remember correctly, the usual behavior in Spark is not to throw an exception but prefers null value. At this point, should we assign content null value instead of throwing exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, we can control it with ignoreCorruptFiles.
| val content = try { | ||
| ByteStreams.toByteArray(stream) | ||
| } finally { | ||
| Closeables.close(stream, true) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related to above comment, should we not propagate IO exceptions?
| StructField("status", fileStatusSchema, false) :: Nil) | ||
| } | ||
|
|
||
| class BinaryFileSourceOptions( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal at all but let me leave a note before I forget. BinaryFileSourceOptions -> BinaryFileOptions to be consistent with [SourceName]Options - TextOptions, OrcOptions, ParquetOptions, CSVOptions,JDBCOptions, ImageOptions, etc.
…or all file sources ## What changes were proposed in this pull request? ### Background: The data source option `pathGlobFilter` is introduced for Binary file format: #24354 , which can be used for filtering file names, e.g. reading `.png` files only while there is `.json` files in the same directory. ### Proposal: Make the option `pathGlobFilter` as a general option for all file sources. The path filtering should happen in the path globbing on Driver. ### Motivation: Filtering the file path names in file scan tasks on executors is kind of ugly. ### Impact: 1. The splitting of file partitions will be more balanced. 2. The metrics of file scan will be more accurate. 3. Users can use the option for reading other file sources. ## How was this patch tested? Unit tests Closes #24518 from gengliangwang/globFilter. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
## What changes were proposed in this pull request? Implement binary file data source in Spark. Format name: "binaryFile" (case-insensitive) Schema: - content: BinaryType - status: StructType - path: StringType - modificationTime: TimestampType - length: LongType Options: * pathGlobFilter (instead of pathFilterRegex) to reply on GlobFilter behavior * maxBytesPerPartition is not implemented since it is controlled by two SQL confs: maxPartitionBytes and openCostInBytes. ## How was this patch tested? Unit test added. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#24354 from WeichenXu123/binary_file_datasource. Lead-authored-by: WeichenXu <[email protected]> Co-authored-by: Xiangrui Meng <[email protected]> Signed-off-by: Xiangrui Meng <[email protected]>
…or all file sources The data source option `pathGlobFilter` is introduced for Binary file format: apache#24354 , which can be used for filtering file names, e.g. reading `.png` files only while there is `.json` files in the same directory. Make the option `pathGlobFilter` as a general option for all file sources. The path filtering should happen in the path globbing on Driver. Filtering the file path names in file scan tasks on executors is kind of ugly. 1. The splitting of file partitions will be more balanced. 2. The metrics of file scan will be more accurate. 3. Users can use the option for reading other file sources. Unit tests Closes apache#24518 from gengliangwang/globFilter. Authored-by: Gengliang Wang <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
Implement binary file data source in Spark.
Format name: "binaryFile" (case-insensitive)
Schema:
Options:
How was this patch tested?
Unit test added.
Please review http://spark.apache.org/contributing.html before opening a pull request.