Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Apr 17, 2025

Which issue does this PR close?

Closes #.

Rationale for this change

What changes are included in this PR?

  • Use common supportedDataType method for columnar and native shuffle
  • Add TimestampNTZType as a supported type
  • Add fuzz test for shuffle that asserts that shuffle is native when experimental native scans are enabled

How are these changes tested?

@andygrove andygrove force-pushed the shuffle-type-checks branch from 0365fb9 to eec034a Compare April 17, 2025 17:08

def supportedShuffleDataType(dt: DataType): Boolean = dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType |
Copy link
Member Author

@andygrove andygrove Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code was moved and is not new. I added TimestampNTZType.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test with TimestampNTZType for shuffle?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I found that TimestampNTZType was not supported because the test was initially failing. The fuzz test generates a file with all supported types (but maps are currently explicitly disabled in this test suite).

_: DateType | _: BooleanType =>
true
case _ =>
// Native shuffle doesn't support struct/array yet
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it does! This method is removed and we now have a single supportedShuffleDataType method that is used for both native and columnar shuffle type checks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for that, I was so confused about having this supported check in at least 3 places

/**
* Determine which data types are supported as hash-partition keys in a shuffle.
*/
def supportedShufflePartitionDataType(dt: DataType): Boolean = dt match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def supportedShufflePartitionDataType(dt: DataType): Boolean = dt match {
def supportedShufflePartitionKeyDataType(dt: DataType): Boolean = dt match {

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I applied this change myself since I had to update the caller sites as well.

}

/**
* Determine which data types are supported as hash-partition keys in a shuffle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Determine which data types are supported as hash-partition keys in a shuffle.
* Determine which data types are supported as hash-partition keys in a shuffle.
Hash Partition Key determines how data should be collocated for operations like `groupByKey`, `reduceByKey` or `join`

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@andygrove andygrove marked this pull request as ready for review April 17, 2025 19:02
@andygrove
Copy link
Member Author

I'm seeing a number of failures like this:

2025-04-17T19:12:54.7914570Z - columnar shuffle on struct including nulls *** FAILED *** (352 milliseconds)
2025-04-17T19:12:54.7916470Z   List() had length 0 instead of expected length 1 Sort [_1#4948 ASC NULLS FIRST], false, 0
2025-04-17T19:12:54.7917940Z   +- Exchange hashpartitioning(_1#4948, _2#4949, 10), REPARTITION_BY_NUM, [plan_id=13332]
2025-04-17T19:12:54.7919190Z      +- Filter (isnotnull(_1#4948) AND (_1#4948 > 1))
2025-04-17T19:12:54.7922160Z         +- FileScan parquet [_1#4948,_2#4949] Batched: true, DataFilters: [isnotnull(_1#4948), (_1#4948 > 1)], Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/Users/runner/work/datafusion-comet/datafusion-comet/spark/target..., PartitionFilters: [], PushedFilters: [IsNotNull(_1), GreaterThan(_1,1)], ReadSchema: struct<_1:int,_2:struct<_1:int,_2:string>>

@codecov-commenter
Copy link

codecov-commenter commented Apr 18, 2025

Codecov Report

Attention: Patch coverage is 88.46154% with 6 lines in your changes missing coverage. Please review.

Project coverage is 58.80%. Comparing base (f09f8af) to head (b5b4d27).
Report is 149 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 90.69% 0 Missing and 4 partials ⚠️
...org/apache/comet/CometSparkSessionExtensions.scala 77.77% 0 Missing and 2 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1655      +/-   ##
============================================
+ Coverage     56.12%   58.80%   +2.68%     
- Complexity      976     1082     +106     
============================================
  Files           119      125       +6     
  Lines         11743    12592     +849     
  Branches       2251     2362     +111     
============================================
+ Hits           6591     7405     +814     
- Misses         4012     4015       +3     
- Partials       1140     1172      +32     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@andygrove
Copy link
Member Author

@parthchandra @mbutrovich Ths PR is now ready for review

@mbutrovich
Copy link
Contributor

Shout out to @Kontinuation! #1511 removed a lot a of the custom logic in the shuffle writer that would have needed to be extended to support complex types. Instead we now rely on Arrow functions that already support complex types.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @andygrove

}

test("shuffle") {
val df = spark.read.parquet(filename)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the data have complex type?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the data has arrays and structs but not maps yet


def supportedShuffleDataType(dt: DataType): Boolean = dt match {
case _: ByteType | _: ShortType | _: IntegerType | _: LongType | _: FloatType |
_: DoubleType | _: StringType | _: BinaryType | _: TimestampType | _: TimestampNTZType |
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test with TimestampNTZType for shuffle?

* Determine which data types are supported in a shuffle.
*/
def supportedShuffleDataType(dt: DataType): Boolean = dt match {
case _: BooleanType => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit : BooleanType moved here alone because of the code style checks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. At one point I was seeing errors related to boolean and had made functional changes here that I later reverted.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reverted the style change

Comment on lines -2923 to +2939
_: DoubleType | _: TimestampType | _: TimestampType | _: DecimalType | _: DateType =>
_: DoubleType | _: TimestampType | _: TimestampNTZType | _: DecimalType |
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated to the goal of the PR but I noticed we had TimestampType twice and no TimestampNTZType

@andygrove andygrove merged commit c04784a into apache:main Apr 19, 2025
78 checks passed
@andygrove andygrove deleted the shuffle-type-checks branch July 10, 2025 22:08
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants