Skip to content

Conversation

@parthchandra
Copy link
Contributor

Which issue does this PR close?

Partly addresses test failures caused by #1348

Rationale for this change

As the issue points out, datafusion comet returns different values from Spark for uint_8 and uint_16 parquet types that may have the sign bit set.

What changes are included in this PR?

Rewrites the parquet test files to not use the uint_8 and uint16 types if the complex type readers are enabled.

How are these changes tested?

Locally using existing unit tests. Note that the unit tests still fail, but not because of unsigned ints

@codecov-commenter
Copy link

codecov-commenter commented Feb 7, 2025

Codecov Report

❌ Patch coverage is 45.45455% with 6 lines in your changes missing coverage. Please review.
✅ Project coverage is 39.07%. Comparing base (f09f8af) to head (ddfac54).
⚠️ Report is 350 commits behind head on main.

Files with missing lines Patch % Lines
...org/apache/comet/CometSparkSessionExtensions.scala 0.00% 0 Missing and 3 partials ⚠️
.../main/scala/org/apache/comet/DataTypeSupport.scala 25.00% 2 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##               main    #1376       +/-   ##
=============================================
- Coverage     56.12%   39.07%   -17.06%     
- Complexity      976     2077     +1101     
=============================================
  Files           119      263      +144     
  Lines         11743    60767    +49024     
  Branches       2251    12921    +10670     
=============================================
+ Hits           6591    23742    +17151     
- Misses         4012    32534    +28522     
- Partials       1140     4491     +3351     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, valueRanges + 1)
withParquetTable(path.toString, "tbl") {
if (CometSparkSessionExtensions.isComplexTypeReaderEnabled(conf)) {
checkSparkAnswer("select _9, _10 FROM tbl order by _11")
Copy link
Member

@andygrove andygrove Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we already have logic to fall back to Spark when the complex type reader is enabled and when the query references uint Parquet fields?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No we don't for two reasons. Firstly, in the plan we get the schema as understood by Spark so all the signed int_8 and int_16 values are indistinguishable from the unsigned ones. As a result we fall back to Spark for both signed and unsigned integers. Secondly, too many unit tests fail because we check that the plan contains a comet operator and would need to be modified.
I'm open to putting it back though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a result we fall back to Spark for both signed and unsigned integers.

Just 8 and 16 bit, or all integers? I'm fine with falling back for 8 and 16 bit for now, although it would be nice to have a config to override this (with the understanding that behavior is incorrect for unsigned integers).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just 8 and 16 bit.
I started with the fallback to spark and a compat override. The reason I reverted it is that I couldn't see a way to get to compatibility with spark even after/if apache/arrow-rs#7040 is addressed.
Let me do as you suggest. Marking this as draft in the meantime.

@parthchandra parthchandra marked this pull request as draft February 7, 2025 18:39
@parthchandra parthchandra marked this pull request as ready for review February 10, 2025 17:07
@parthchandra
Copy link
Contributor Author

@andygrove updates this to fallback, updated the unit tests and removed the draft tag

pageSize: Int = 128,
randomSize: Int = 0): Unit = {
val schemaStr =
def getAllTypesParquetSchema: String = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are renaming this method, I wonder if we should remove the AllTypes part since it does not generate all types. Perhaps getPrimitiveTypesParquetSchema?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

sql("SELECT array_remove(array(_2, _3,_4), _3) from t1 where _3 is not null"))
checkSparkAnswerAndOperator(sql(
"SELECT array_remove(case when _2 = _3 THEN array(_2, _3,_4) ELSE null END, _3) from t1"))
withSQLConf(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key -> "true") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should default COMET_SCAN_ALLOW_INCOMPATIBLE=true in CometTestBase and then just disable it in specific tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be okay with that. Most Spark users will not have unsigned ints, and having it false creates a penalty for users who do not have any unsigned ints unless they explicitly set the allow incompatible flag.
Changing this and reverting the unit tests which had to explicitly set the flag.

conf("spark.comet.scan.allowIncompatible")
.doc(
"Comet is not currently fully compatible with Spark for all datatypes. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We link to the Compatibility Guide here but there is no new information in that guide about handling for byte/short, so would be good to add that. This could be done in a follow on PR.

org.apache.spark.SPARK_VERSION >= "4.0"
}

def isComplexTypeReaderEnabled(conf: SQLConf): Boolean = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the naming confusing here. This method determines if we are using native_datafusion or native_iceberg_compat (which both use DataFusion's ParquetExec). This is no logic related to complex types.

Complex type support was a big motivation for adding these new scans, but it doesn't seem to make sense to refer to complex types in the changes in this PR.

This is just a nit, and we can rename the methods in a future PR.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @parthchandra. I left some comments but they are nits that we can address in follow on PRs.

"Comet is not currently fully compatible with Spark for all datatypes. " +
s"Set this config to true to allow them anyway. $COMPAT_GUIDE.")
.booleanConf
.createWithDefault(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should default this to false because it is a correctness issue, and explicitly set this to true in CometTestBase.

@andygrove
Copy link
Member

I created a follow on PR #1398

@andygrove andygrove merged commit 57a4dca into apache:main Feb 13, 2025
74 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants