-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-11412][SQL] Support merge schema for ORC #24043
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
390639c to
f7532a4
Compare
|
@dongjoon-hyun Could you please review this PR? |
|
Retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens for OrcDataSourceV2? It's used, isn't it?
|
Test build #105233 has finished for PR 24043 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this test case to OrcSuite. Then, both OrcSourceSuite and HiveOrcSourceSuite will test native and hive ORC implementations respectively.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, remove this OrcUtilsSuite test suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this test suite, please.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @WangGuangxin . Sorry for late response. I left a few comments.
|
@dongjoon-hyun Thanks for your review. I've updated it according to your comments. |
|
Retest this please. |
|
Test build #105665 has finished for PR 24043 at commit
|
|
The build failure was because the type of Retest this please. |
|
Retest this please. |
|
Test build #105689 has finished for PR 24043 at commit
|
|
kindly ping @dongjoon-hyun |
|
Sorry for the delay, @WangGuangxin . |
|
Retest this please. |
| } | ||
|
|
||
| /** | ||
| * Read single ORC file schema using native version of ORC |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the following line because this is only used in this file and OrcSourceSuite.
* This is visible for testing.
| } | ||
|
|
||
| def inferSchema(sparkSession: SparkSession, files: Seq[FileStatus], options: Map[String, String]) | ||
| : Option[StructType] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this function is used in native ORC readers (OrcFileFormat/OrcTable), and hive OrcFileFormat has its own implementation of inferSchema, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this function is used in
nativeORC readers (OrcFileFormat/OrcTable), andhiveOrcFileFormat has its own implementation ofinferSchema, right?
Yes. Do you think it's necessary to refactor this function?
| * Read single ORC file schema using native version of ORC | ||
| */ | ||
| def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean) | ||
| : Option[StructType] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, the existing code around here follows a wrong indentation rule. Let's use correct indentation at least at new code. : Option[StructType] should have 2-space indentation instead of 4-space.
def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean)
- : Option[StructType] = {
+ : Option[StructType] = {| sparkSession: SparkSession, | ||
| files: Seq[FileStatus], | ||
| singleFileSchemaReader: (String, Configuration, Boolean) => Option[StructType]) | ||
| : Option[StructType] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. 2-space.
| * Read single ORC file schema using Hive ORC library | ||
| */ | ||
| def singleFileSchemaReader(file: String, conf: Configuration, ignoreCorruptFiles: Boolean) | ||
| : Option[StructType] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. 2-space.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, please add the followings at ReadSchemaSuite.
@@ -32,6 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
*
* -> OrcReadSchemaSuite
* -> VectorizedOrcReadSchemaSuite
+ * -> MergedORCReadSchemaSuite@@ -134,6 +135,31 @@ class VectorizedOrcReadSchemaSuite
}
}
+class MergedOrcReadSchemaSuite
+ extends ReadSchemaSuite
+ with AddColumnIntoTheMiddleTest
+ with HideColumnInTheMiddleTest
+ with AddNestedColumnTest
+ with HideNestedColumnTest
+ with ChangePositionTest
+ with BooleanTypeTest
+ with IntegralTypeTest
+ with ToDoubleTypeTest {
+
+ override val format: String = "orc"
+
+ override def beforeAll() {
+ super.beforeAll()
+ originalConf = spark.conf.get(SQLConf.ORC_SCHEMA_MERGING_ENABLED)
+ spark.conf.set(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key, "true")
+ }
+
+ override def afterAll() {
+ spark.conf.set(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key, originalConf)
+ super.afterAll()
+ }
+}
+
class ParquetReadSchemaSuite|
Lastly, it would be great if you can add some performance comparisons between Parquet/ORC merge schema in the PR description. This PR aims to add new features for ORC/Parquet feature parity. So, if there is a big slowness on new code, it's not desirable. |
|
Test build #105853 has finished for PR 24043 at commit
|
|
Test build #106916 has finished for PR 24043 at commit
|
| } | ||
|
|
||
| test("SPARK-11412 test enabling/disabling schema merging with data type conflicts") { | ||
| def testSchemaMergingWithDataTypeConflicts(expectedColumnNumber: Int): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I don't think we need to make this a function. We can do it like this:
withTempDir { dir =>
spark.range(0, 10).toDF("a").write..
withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "true") {
spark.read..
}
withSQLConf(SQLConf.ORC_SCHEMA_MERGING_ENABLED.key -> "false") {
spark.read..
}
}
So that the test case doesn't need to write duplicated files twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, agree.
|
Test build #106921 has finished for PR 24043 at commit
|
|
Test build #106918 has finished for PR 24043 at commit
|
|
Retest it please |
@gengliangwang Could you please make jenkins retest this? Thanks |
|
retest this please. |
|
Test build #106987 has finished for PR 24043 at commit
|
|
retest this please. |
|
Jenkins always restart at this time... |
|
Test build #106988 has finished for PR 24043 at commit
|
|
@WangGuangxin Could you submit a follow-up PR for updating the document? See the example https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#schema-merging |
|
Thanks! Merged to master. |
|
Thanks for your work @WangGuangxin ! What is your JIRA account? We need to assign the assignee field to your JIRA account. https://issues.apache.org/jira/browse/SPARK-11412 |
Thanks. My jira account is EdisonWang |
ok |
## What changes were proposed in this pull request?
Currently, ORC's `inferSchema` is implemented as randomly choosing one ORC file and reading its schema.
This PR follows the behavior of Parquet, it implements merge schemas logic by reading all ORC files in parallel through a spark job.
Users can enable merge schema by `spark.read.orc("xxx").option("mergeSchema", "true")` or by setting `spark.sql.orc.mergeSchema` to `true`, the prior one has higher priority.
## How was this patch tested?
tested by UT OrcUtilsSuite.scala
Closes apache#24043 from WangGuangxin/SPARK-11412.
Lead-authored-by: wangguangxin.cn <[email protected]>
Co-authored-by: wangguangxin.cn <[email protected]>
Signed-off-by: gatorsmile <[email protected]>
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@HyukjinKwon - Sure, will do. |
… APIs ### What changes were proposed in this pull request? This PR is a follow-up to #24043 and cousin of #26730. It exposes the `mergeSchema` option directly in the ORC APIs. ### Why are the changes needed? So the Python API matches the Scala API. ### Does this PR introduce any user-facing change? Yes, it adds a new option directly in the ORC reader method signatures. ### How was this patch tested? I tested this manually as follows: ``` >>> spark.range(3).write.orc('test-orc') >>> spark.range(3).withColumnRenamed('id', 'name').write.orc('test-orc/nested') >>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=True) DataFrame[id: bigint, name: bigint] >>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=False) DataFrame[id: bigint] >>> spark.conf.set('spark.sql.orc.mergeSchema', True) >>> spark.read.orc('test-orc', recursiveFileLookup=True) DataFrame[id: bigint, name: bigint] >>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=False) DataFrame[id: bigint] ``` Closes #26755 from nchammas/SPARK-30113-ORC-mergeSchema. Authored-by: Nicholas Chammas <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
… APIs ### What changes were proposed in this pull request? This PR is a follow-up to apache#24043 and cousin of apache#26730. It exposes the `mergeSchema` option directly in the ORC APIs. ### Why are the changes needed? So the Python API matches the Scala API. ### Does this PR introduce any user-facing change? Yes, it adds a new option directly in the ORC reader method signatures. ### How was this patch tested? I tested this manually as follows: ``` >>> spark.range(3).write.orc('test-orc') >>> spark.range(3).withColumnRenamed('id', 'name').write.orc('test-orc/nested') >>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=True) DataFrame[id: bigint, name: bigint] >>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=False) DataFrame[id: bigint] >>> spark.conf.set('spark.sql.orc.mergeSchema', True) >>> spark.read.orc('test-orc', recursiveFileLookup=True) DataFrame[id: bigint, name: bigint] >>> spark.read.orc('test-orc', recursiveFileLookup=True, mergeSchema=False) DataFrame[id: bigint] ``` Closes apache#26755 from nchammas/SPARK-30113-ORC-mergeSchema. Authored-by: Nicholas Chammas <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
…p' and 'pathGlobFilter' in file sources 'mergeSchema' in ORC ### What changes were proposed in this pull request? This PR adds and exposes the options, 'recursiveFileLookup' and 'pathGlobFilter' in file sources 'mergeSchema' in ORC, into documentation. - `recursiveFileLookup` at file sources: #24830 ([SPARK-27627](https://issues.apache.org/jira/browse/SPARK-27627)) - `pathGlobFilter` at file sources: #24518 ([SPARK-27990](https://issues.apache.org/jira/browse/SPARK-27990)) - `mergeSchema` at ORC: #24043 ([SPARK-11412](https://issues.apache.org/jira/browse/SPARK-11412)) **Note that** `timeZone` option was not moved from `DataFrameReader.options` as I assume it will likely affect other datasources as well once DSv2 is complete. ### Why are the changes needed? To document available options in sources properly. ### Does this PR introduce any user-facing change? In PySpark, `pathGlobFilter` can be set via `DataFrameReader.(text|orc|parquet|json|csv)` and `DataStreamReader.(text|orc|parquet|json|csv)`. ### How was this patch tested? Manually built the doc and checked the output. Option setting in PySpark is rather a logical change. I manually tested one only: ```bash $ ls -al tmp ... -rw-r--r-- 1 hyukjin.kwon staff 3 Dec 20 12:19 aa -rw-r--r-- 1 hyukjin.kwon staff 3 Dec 20 12:19 ab -rw-r--r-- 1 hyukjin.kwon staff 3 Dec 20 12:19 ac -rw-r--r-- 1 hyukjin.kwon staff 3 Dec 20 12:19 cc ``` ```python >>> spark.read.text("tmp", pathGlobFilter="*c").show() ``` ``` +-----+ |value| +-----+ | ac| | cc| +-----+ ``` Closes #26958 from HyukjinKwon/doc-followup. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
What changes were proposed in this pull request?
Currently, ORC's
inferSchemais implemented as randomly choosing one ORC file and reading its schema.This PR follows the behavior of Parquet, it implements merge schemas logic by reading all ORC files in parallel through a spark job.
Users can enable merge schema by
spark.read.orc("xxx").option("mergeSchema", "true")or by settingspark.sql.orc.mergeSchematotrue, the prior one has higher priority.How was this patch tested?
tested by UT OrcUtilsSuite.scala