-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32234][SQL] Spark sql commands are failing on selecting the orc tables #29045
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your contribution, @SaurabhChawla100 .
In order to prevent the future regression, could you make a UT with your example, please?
|
ok to test |
|
Test build #125398 has finished for PR 29045 at commit
|
|
Retest this please. |
|
Test build #125411 has finished for PR 29045 at commit
|
Sure I will add the unit test |
|
Test build #125502 has finished for PR 29045 at commit
|
|
Retest this please. |
|
Test build #125520 has finished for PR 29045 at commit
|
|
retest this please |
| assert(error == null) | ||
| spark.sql(s"DROP TABLE IF EXISTS test_date_spark_orc") | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove this blank.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| } | ||
| } | ||
|
|
||
| test("orc data created by the hive tables having _col fields name") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Plz add the prefix test("SPARK-32234: orc data....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| spark.sql(s"DROP TABLE IF EXISTS test_date_hive_orc") | ||
| } | ||
|
|
||
| test("orc data created by the spark having proper fields name") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| error = e | ||
| } | ||
| assert(error == null) | ||
| spark.sql(s"DROP TABLE IF EXISTS test_date_hive_orc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the other tests carefully then follow how to write tests there. How about refactoring it like this?
withTable("test_date_hive_orc") {
spark.sql(
s"""
|CREATE TABLE test_date_hive_orc
| (col1 INT, col2 STRING, col3 INT)
| USING orc
""".stripMargin)
spark.sql(
s"""
|INSERT INTO test_date_hive_orc VALUES
| (9, '12', 2020)
""".stripMargin)
val df = spark.sql("SELECT col2 FROM test_date_hive_orc")
checkAnswer(df, Row(...))
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactored the unit test
|
Test build #125555 has finished for PR 29045 at commit
|
|
Test build #125593 has started for PR 29045 at commit |
| assert(error == null) | ||
| spark.sql( | ||
| s""" | ||
| |DROP TABLE IF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to drop the table. Please see the implementation of withTable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes in withTable table its handled, Removed this drop table .
| case e: Throwable => | ||
| error = e | ||
| } | ||
| assert(error == null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you don't need this check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes not required already handled in the framework in checkAnswer,I had removed it
|
Test build #125668 has finished for PR 29045 at commit
|
| } | ||
|
|
||
| test("SPARK-32234: orc data created by the hive tables having _col fields name") { | ||
| var error: Throwable = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please remove this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| withTable("test_date_hive_orc") { | ||
| spark.sql( | ||
| """ | ||
| |CREATE TABLE `test_date_hive_orc` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we need the backquotes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not required
| | values(9, '12', 2020) | ||
| """.stripMargin) | ||
|
|
||
| val df = spark.sql("select d_date_id from test_date_spark_orc") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please use the uppercases for SQL keywrods (e.g., SELECT) where possible
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| // in the physical schema, there is a need to send the | ||
| // entire dataSchema instead of required schema | ||
| val orcFieldNames = reader.getSchema.getFieldNames.asScala | ||
| if (orcFieldNames.forall(_.startsWith("_col"))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this code mean? _col needs to be hard-coded?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is for a ORC file written by Hive, no field names in the physical schema. In that case it its having names like _col1, _col2 etc.
Check this code for reference
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcUtils.scala
Line 133 in 84db660
| if (orcFieldNames.forall(_.startsWith("_col"))) { |
|
Test build #125675 has finished for PR 29045 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for updating, but could you update the PR description with the reproducible example? If someone is following the example, it will not fail because they don't have a data in /Users/test/tpcds_scale5data/date_dim. Also, please remove irrelevant stuff like TBLPROPERTIES.
| } | ||
| } | ||
|
|
||
| test("SPARK-32234: orc data created by the spark having proper fields name") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove this test case because this test case pass without your patch? We already has a test coverage for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
Test build #125850 has finished for PR 29045 at commit
|
|
Test build #125851 has finished for PR 29045 at commit
|
|
Retest this please |
| isCaseSensitive, dataSchema, requiredSchema, reader, conf) | ||
| } | ||
|
|
||
| if (!canPruneCols) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can simplify the code a bit
val resultSchemaString = if (canPruneCols) {
OrcUtils.orcTypeDescriptionString(resultSchema)
} else {
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
}
Then we don't need to keep the val actualSchema =... and var resultSchemaString =... at the beginning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| reader: Reader, | ||
| conf: Configuration): Option[Array[Int]] = { | ||
| conf: Configuration): (Option[Array[Int]], Boolean) = { | ||
| var canPruneCols = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need it? We can just use boolean literal in the places that return the value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it
| withSQLConf( | ||
| SQLConf.ORC_IMPLEMENTATION.key -> orcImpl, | ||
| SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vectorized) { | ||
| withTempPath { dir => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we don't need to provide a custom location. CREATE TABLE without LOCATION clause can also reproduce it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it
| /** | ||
| * Returns the requested column ids from the given ORC file. Column id can be -1, which means the | ||
| * requested column doesn't exist in the ORC file. Returns None if the given ORC file is empty. | ||
| * @return Returns the requested column ids from the given ORC file and Boolean flag to use actual |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we update the comment a little bit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated the comment
| } else { | ||
| OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)) | ||
| } | ||
| OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To avoid duplicated code, we can move these code to the requestedColumnIds method
def requestedColumnIds(..., partitionSchema: StructType): Option[Array[Int]] = {
...
if (orcFieldNames.isEmpty) {
None
} else if (orcFieldNames.forall(_.startsWith("_col"))) {
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf,
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)))
...
} else {
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf,
OrcUtils.orcTypeDescriptionString(StructType(requiredSchema.fields ++ partitionSchema.fields)))
...
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan - In this we need to return the resultSchemaString from this method Option[(Array[Int], String)]
which is for else if (orcFieldNames.forall(_.startsWith("_col"))) {
val resultSchemaString = OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields)
else
val resultSchemaString = OrcUtils.orcTypeDescriptionString(StructType(requiredSchema.fields ++ partitionSchema.fields)))
since we are using this resultSchemaString in
batchReader.initBatch(
TypeDescription.fromString(resultSchemaString),
resultSchema.fields,
shall we make this change or create some helper method from the code in orc utils
val resultSchemaString =someMethod()
someMethod(): String {
val resultSchemaString = if (canPruneCols) {
OrcUtils.orcTypeDescriptionString(resultSchema)
} else {
OrcUtils.orcTypeDescriptionString(StructType(dataSchema.fields ++ partitionSchema.fields))
}
OrcConf.MAPRED_INPUT_SCHEMA.setString(conf, resultSchemaString)
resultSchemaString
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding a new helper method is also good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added the helper method
| } | ||
| } | ||
|
|
||
| test("SPARK-32234: orc data created by the hive tables having _col fields name" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can shorten the test name: SPARK-32234: read ORC table with column names all starting with '_col'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except some code style issues. Thanks for the fix!
| val (requestedColIds, canPruneCols) = resultedColPruneInfo.get | ||
| val resultSchemaString = OrcUtils.orcResultSchemaString(canPruneCols, | ||
| dataSchema, resultSchema, partitionSchema, conf) | ||
| val requestedDataColIds = requestedColIds ++ Array.fill(partitionSchema.length)(-1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should switch the name. Here we add the partition column IDs and better to call it requestedColIds. The former one can be called requestedDataColIds as it doesn't contain partition columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
switched the name
|
Test build #125892 has finished for PR 29045 at commit
|
|
Test build #125907 has finished for PR 29045 at commit
|
|
Test build #125949 has finished for PR 29045 at commit
|
|
@cloud-fan / @dongjoon-hyun - The test build is failing with following error I am not sure if its related to change done on this PR |
|
All the github action checks passed, I think it's good to go. Merging to master/3.0! |
…c tables
### What changes were proposed in this pull request?
Spark sql commands are failing on selecting the orc tables
Steps to reproduce
Example 1 -
Prerequisite - This is the location(/Users/test/tpcds_scale5data/date_dim) for orc data which is generated by the hive.
```
val table = """CREATE TABLE `date_dim` (
`d_date_sk` INT,
`d_date_id` STRING,
`d_date` TIMESTAMP,
`d_month_seq` INT,
`d_week_seq` INT,
`d_quarter_seq` INT,
`d_year` INT,
`d_dow` INT,
`d_moy` INT,
`d_dom` INT,
`d_qoy` INT,
`d_fy_year` INT,
`d_fy_quarter_seq` INT,
`d_fy_week_seq` INT,
`d_day_name` STRING,
`d_quarter_name` STRING,
`d_holiday` STRING,
`d_weekend` STRING,
`d_following_holiday` STRING,
`d_first_dom` INT,
`d_last_dom` INT,
`d_same_day_ly` INT,
`d_same_day_lq` INT,
`d_current_day` STRING,
`d_current_week` STRING,
`d_current_month` STRING,
`d_current_quarter` STRING,
`d_current_year` STRING)
USING orc
LOCATION '/Users/test/tpcds_scale5data/date_dim'"""
spark.sql(table).collect
val u = """select date_dim.d_date_id from date_dim limit 5"""
spark.sql(u).collect
```
Example 2
```
val table = """CREATE TABLE `test_orc_data` (
`_col1` INT,
`_col2` STRING,
`_col3` INT)
USING orc"""
spark.sql(table).collect
spark.sql("insert into test_orc_data values(13, '155', 2020)").collect
val df = """select _col2 from test_orc_data limit 5"""
spark.sql(df).collect
```
Its Failing with below error
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, 192.168.0.103, executor driver): java.lang.ArrayIndexOutOfBoundsException: 1
at org.apache.spark.sql.execution.datasources.orc.OrcColumnarBatchReader.initBatch(OrcColumnarBatchReader.java:156)
at org.apache.spark.sql.execution.datasources.orc.OrcFileFormat.$anonfun$buildReaderWithPartitionValues$7(OrcFileFormat.scala:258)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:141)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:203)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:116)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:620)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:729)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:343)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:895)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:895)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:372)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:336)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:133)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:445)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1489)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:448)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)`
```
The reason behind this initBatch is not getting the schema that is needed to find out the column value in OrcFileFormat.scala
```
batchReader.initBatch(
TypeDescription.fromString(resultSchemaString)
```
### Why are the changes needed?
Spark sql queries for orc tables are failing
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test is added for this .Also Tested through spark shell and spark submit the failing queries
Closes #29045 from SaurabhChawla100/SPARK-32234.
Lead-authored-by: SaurabhChawla <[email protected]>
Co-authored-by: SaurabhChawla <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 6be8b93)
Signed-off-by: Wenchen Fan <[email protected]>
|
Thank you, @SaurabhChawla100 and @cloud-fan . |
| * @return Returns the result schema string based on the canPruneCols flag. | ||
| * resultSchemaString will be created using resultsSchema in case of | ||
| * canPruneCols is true and for canPruneCols as false value | ||
| * resultSchemaString will be created using the actual dataSchema. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description is not clear enough. This utility function also changed the value of conf. We need to document it.
@SaurabhChawla100 Could you submit a follow-up PR to improve the description?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@gatorsmile - This is the new helper method that we have added as the part of this PR
sure I Will update the description in the follow-up PR . Shall I raised the PR against the new Jira or with this same jira . Since this Jira is already resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall I raised the PR against the new Jira or with this same jira
Its okay to refer to this JIRA ticket. Then, please add [FOLLOWUP] in the PR title.
### What changes were proposed in this pull request? As the part of this PR #29045 added the helper method. This PR is the FOLLOWUP PR to update the description of helper method. ### Why are the changes needed? For better readability and understanding of the code ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Since its only change of updating the description , So ran the Spark shell Closes #29232 from SaurabhChawla100/SPARK-32234-Desc. Authored-by: SaurabhChawla <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Spark sql commands are failing on selecting the orc tables
Steps to reproduce
Example 1 -
Prerequisite - This is the location(/Users/test/tpcds_scale5data/date_dim) for orc data which is generated by the hive.
Example 2
Its Failing with below error
The reason behind this initBatch is not getting the schema that is needed to find out the column value in OrcFileFormat.scala
Why are the changes needed?
Spark sql queries for orc tables are failing
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test is added for this .Also Tested through spark shell and spark submit the failing queries