-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27990][SQL][ML] Provide a way to recursively load data from datasource #24830
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #106348 has finished for PR 24830 at commit
|
mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
Outdated
Show resolved
Hide resolved
...e/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
Outdated
Show resolved
Hide resolved
|
Test build #106570 has finished for PR 24830 at commit
|
|
Jenkins, retest this please. |
|
Test build #106583 has finished for PR 24830 at commit
|
|
@cloud-fan @gengliangwang It would be great if you can make a pass. The |
...e/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileIndex.scala
Outdated
Show resolved
Hide resolved
| val selectedPartitions = if (partitionSpec().partitionColumns.isEmpty) { | ||
| PartitionDirectory(InternalRow.empty, allFiles().filter(isNonEmptyFile)) :: Nil | ||
| } else { | ||
| if (recursiveFileLookup) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This branch seems not reachable. Should we simply use assert here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, it is reachable I think.
See class PrunedInMemoryFileIndex which explicitly set partitionSpec.
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
Show resolved
Hide resolved
|
Nit: Please update the PR description
|
|
Test build #106621 has finished for PR 24830 at commit
|
|
@cloud-fan @gengliangwang PR updated. Thanks! |
|
Test build #106630 has finished for PR 24830 at commit
|
sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
Show resolved
Hide resolved
mllib/src/test/scala/org/apache/spark/ml/source/image/ImageFileFormatSuite.scala
Outdated
Show resolved
Hide resolved
gengliangwang
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except one comment
|
Test build #106648 has finished for PR 24830 at commit
|
|
Test build #106656 has finished for PR 24830 at commit
|
|
retest this please. |
|
Test build #106665 has finished for PR 24830 at commit
|
|
retest this please. |
|
Test build #106672 has finished for PR 24830 at commit
|
|
thanks, merging to master! |
| } | ||
|
|
||
| protected lazy val recursiveFileLookup = { | ||
| parameters.getOrElse("recursiveFileLookup", "false").toBoolean |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we document the option in DataFrameReader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 Could you submit a follow-up PR to document this? This affects all the built-in file sources. We need to update the documentation of both PySpark and Scala APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI, there is a Jira about adding this documentation which you will want to reference: SPARK-29903
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nchammas Could you submit a PR to fix readwriter.py for supporting this new option?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do. I suppose we'll do that separately from adding the docs, which will get their own PR, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Guys, we should also update DataStreamReadaer and streaming.py.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll submit a PR to document it. @gatorsmile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
Sorry to late revisit and post-hoc reviewing. There's no document for this feature so I feel it's better to ask here; what's expected behavior if we have glob in source path like Btw, I guess this is related to SPARK-20568 (#22952) - previously we could assume the possible depths of source file based on the source path (even if it contains glob) to avoid checking the pattern, and this patch may expand the upper bound of depths as infinity. There's no problem, but just wanted to confirm my understanding is correct. |
…Python DataFrameReader ### What changes were proposed in this pull request? As a follow-up to #24830, this PR adds the `recursiveFileLookup` option to the Python DataFrameReader API. ### Why are the changes needed? This PR maintains Python feature parity with Scala. ### Does this PR introduce any user-facing change? Yes. Before this PR, you'd only be able to use this option as follows: ```python spark.read.option("recursiveFileLookup", True).text("test-data").show() ``` With this PR, you can reference the option from within the format-specific method: ```python spark.read.text("test-data", recursiveFileLookup=True).show() ``` This option now also shows up in the Python API docs. ### How was this patch tested? I tested this manually by creating the following directories with dummy data: ``` test-data ├── 1.txt └── nested └── 2.txt test-parquet ├── nested │ ├── _SUCCESS │ ├── part-00000-...-.parquet ├── _SUCCESS ├── part-00000-...-.parquet ``` I then ran the following tests and confirmed the output looked good: ```python spark.read.parquet("test-parquet", recursiveFileLookup=True).show() spark.read.text("test-data", recursiveFileLookup=True).show() spark.read.csv("test-data", recursiveFileLookup=True).show() ``` `python/pyspark/sql/tests/test_readwriter.py` seems pretty sparse. I'm happy to add my tests there, though it seems we have been deferring testing like this to the Scala side of things. Closes #26718 from nchammas/SPARK-27990-recursiveFileLookup-python. Authored-by: Nicholas Chammas <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…Python DataFrameReader ### What changes were proposed in this pull request? As a follow-up to apache#24830, this PR adds the `recursiveFileLookup` option to the Python DataFrameReader API. ### Why are the changes needed? This PR maintains Python feature parity with Scala. ### Does this PR introduce any user-facing change? Yes. Before this PR, you'd only be able to use this option as follows: ```python spark.read.option("recursiveFileLookup", True).text("test-data").show() ``` With this PR, you can reference the option from within the format-specific method: ```python spark.read.text("test-data", recursiveFileLookup=True).show() ``` This option now also shows up in the Python API docs. ### How was this patch tested? I tested this manually by creating the following directories with dummy data: ``` test-data ├── 1.txt └── nested └── 2.txt test-parquet ├── nested │ ├── _SUCCESS │ ├── part-00000-...-.parquet ├── _SUCCESS ├── part-00000-...-.parquet ``` I then ran the following tests and confirmed the output looked good: ```python spark.read.parquet("test-parquet", recursiveFileLookup=True).show() spark.read.text("test-data", recursiveFileLookup=True).show() spark.read.csv("test-data", recursiveFileLookup=True).show() ``` `python/pyspark/sql/tests/test_readwriter.py` seems pretty sparse. I'm happy to add my tests there, though it seems we have been deferring testing like this to the Scala side of things. Closes apache#26718 from nchammas/SPARK-27990-recursiveFileLookup-python. Authored-by: Nicholas Chammas <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…p' and 'pathGlobFilter' in file sources 'mergeSchema' in ORC ### What changes were proposed in this pull request? This PR adds and exposes the options, 'recursiveFileLookup' and 'pathGlobFilter' in file sources 'mergeSchema' in ORC, into documentation. - `recursiveFileLookup` at file sources: #24830 ([SPARK-27627](https://issues.apache.org/jira/browse/SPARK-27627)) - `pathGlobFilter` at file sources: #24518 ([SPARK-27990](https://issues.apache.org/jira/browse/SPARK-27990)) - `mergeSchema` at ORC: #24043 ([SPARK-11412](https://issues.apache.org/jira/browse/SPARK-11412)) **Note that** `timeZone` option was not moved from `DataFrameReader.options` as I assume it will likely affect other datasources as well once DSv2 is complete. ### Why are the changes needed? To document available options in sources properly. ### Does this PR introduce any user-facing change? In PySpark, `pathGlobFilter` can be set via `DataFrameReader.(text|orc|parquet|json|csv)` and `DataStreamReader.(text|orc|parquet|json|csv)`. ### How was this patch tested? Manually built the doc and checked the output. Option setting in PySpark is rather a logical change. I manually tested one only: ```bash $ ls -al tmp ... -rw-r--r-- 1 hyukjin.kwon staff 3 Dec 20 12:19 aa -rw-r--r-- 1 hyukjin.kwon staff 3 Dec 20 12:19 ab -rw-r--r-- 1 hyukjin.kwon staff 3 Dec 20 12:19 ac -rw-r--r-- 1 hyukjin.kwon staff 3 Dec 20 12:19 cc ``` ```python >>> spark.read.text("tmp", pathGlobFilter="*c").show() ``` ``` +-----+ |value| +-----+ | ac| | cc| +-----+ ``` Closes #26958 from HyukjinKwon/doc-followup. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…tasource Provide a way to recursively load data from datasource. I add a "recursiveFileLookup" option. When "recursiveFileLookup" option turn on, then partition inferring is turned off and all files from the directory will be loaded recursively. If some datasource explicitly specify the partitionSpec, then if user turn on "recursive" option, then exception will be thrown. Unit tests. Please review https://spark.apache.org/contributing.html before opening a pull request. Closes apache#24830 from WeichenXu123/recursive_ds. Authored-by: WeichenXu <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Provide a way to recursively load data from datasource.
I add a "recursiveFileLookup" option.
When "recursiveFileLookup" option turn on, then partition inferring is turned off and all files from the directory will be loaded recursively.
If some datasource explicitly specify the partitionSpec, then if user turn on "recursive" option, then exception will be thrown.
How was this patch tested?
Unit tests.
Please review https://spark.apache.org/contributing.html before opening a pull request.