Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Feb 23, 2025

Which issue does this PR close?

Part of #1436

Closes #1437

Rationale for this change

This is a minor fix to massively reduce the number of shuffle spill files that are created during native shuffle.

There is now a maximum of one spill file per output partition. Previously, we had seen tens of thousands of spill files in some cases.

What changes are included in this PR?

  • Add spill_file to PartitionBuffer
  • Add unit test to demonstrate the current behavior of memory pool interactions (which does have some issues and I plan on creating future PRs to start addressing the issues exposed by these unit tests)

How are these changes tested?

I ran TPC-H locally with these settings and confirmed that I saw exchanges spilling.

    --conf spark.driver.memory=8G \
    --conf spark.executor.instances=1 \
    --conf spark.executor.cores=8 \
    --conf spark.cores.max=8 \
    --conf spark.executor.memory=8g \
    --conf spark.memory.offHeap.enabled=true \
    --conf spark.memory.offHeap.size=3g \

The spilled_bytes metric now looks reasonable:

2025-02-23_13-08

@codecov-commenter
Copy link

codecov-commenter commented Feb 23, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 58.59%. Comparing base (f09f8af) to head (d8613e7).
Report is 52 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1440      +/-   ##
============================================
+ Coverage     56.12%   58.59%   +2.46%     
- Complexity      976     1017      +41     
============================================
  Files           119      122       +3     
  Lines         11743    12223     +480     
  Branches       2251     2295      +44     
============================================
+ Hits           6591     7162     +571     
+ Misses         4012     3909     -103     
- Partials       1140     1152      +12     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove andygrove changed the title fix: Reduce number of shuffle spill files [wip] fix: Reduce number of shuffle spill files and fix spilled_bytes metric Feb 24, 2025
@andygrove andygrove changed the title fix: Reduce number of shuffle spill files and fix spilled_bytes metric fix: Reduce number of shuffle spill files, fix spilled_bytes metric, add some unit tests Feb 26, 2025
@andygrove andygrove marked this pull request as ready for review February 26, 2025 19:57
spill_file.seek(SeekFrom::Start(spill.offsets[i]))?;
std::io::copy(&mut spill_file.take(length), &mut output_data)
.map_err(Self::to_df_err)?;
if let Some(spill_data) = self.buffered_partitions[i].spill_file.as_ref() {
Copy link
Contributor

@mbutrovich mbutrovich Feb 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is basically saying, if 1) We have a SpillFile, and 2) the length of that SpillFile is greater than 0 -> we need to copy that spilled data to the output buffer. My question is: because we're now reusing spill files instead of creating them for each spill event, when does the reused SpillFile get truncated back to 0 now that we've copied all of the data to output_data? If it's happening somewhere that I don't see, perhaps a comment here where that happens.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that is correct. We never truncate the spill file. We just append to it. At the end of the shuffle, we copy the contents of each spill file to the shuffle file. I will add some comments to make this clearer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some comments and also removed the check for length > 0 since that was redundant (we only create the spill file if we have data to spill)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I misunderstood the granularity at which the spill file could be written to the output file.

@mbutrovich
Copy link
Contributor

Thanks for tackling this, @andygrove! Great to see shuffle write improving.

assert_eq!(0, buffer.num_active_rows);
assert_eq!(0, buffer.frozen.len());
assert_eq!(0, buffer.reservation.size());
assert!(buffer.spill_file.is_some());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert_eq!(9914, buffer.spill_file.as_ref().unwrap().file.len())? That's the frozen buffer length above.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for suggesting that. I have added it.

@andygrove
Copy link
Member Author

@kazuyukitanimura @comphead could I get a committer approval?


#[test]
#[cfg_attr(miri, ignore)] // miri can't call foreign function `ZSTD_createCCtx`
#[cfg(not(target_os = "macos"))] // Github MacOS runner fails with "Too many open files".
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test was previously failing because it created too many spill files. It now passes.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @andygrove does that mean we got a single shuffle file per partition or single file per executor?

@mbutrovich
Copy link
Contributor

mbutrovich commented Feb 27, 2025

Thanks @andygrove does that mean we got a single shuffle file per partition or single file per executor?

Just to clarify: when you say shuffle file do you mean the final output or the spill file? My understanding is the final output is a single shuffle file and a single index file (the index provides partition offsets in the shuffle file) per executor.

@andygrove
Copy link
Member Author

Thanks @andygrove does that mean we got a single shuffle file per partition or single file per executor?

For each ShuffleMapTask there will now be a maximum of one spill file per output partition. An executor could be running multiple tasks in parallel.

@andygrove
Copy link
Member Author

Thanks @andygrove does that mean we got a single shuffle file per partition or single file per executor?

Just to clarify: when you say shuffle file do you mean the final output or the spill file? My understanding is the final output is a single shuffle file and a single index file (the index provides partition offsets in the shuffle file) per executor.

That's correct. There is no change to the shuffle data and index files, just fewer spill files.

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @andygrove lgtm

@andygrove andygrove merged commit 928e1a2 into apache:main Feb 27, 2025
74 checks passed
@andygrove andygrove deleted the shuffle-quick-fix branch February 27, 2025 21:11
@andygrove
Copy link
Member Author

Thanks for the reviews @mbutrovich and @comphead

coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Shuffle spilled_bytes metric is incorrect

4 participants