Skip to content

Conversation

@andygrove
Copy link
Member

@andygrove andygrove commented Feb 26, 2025

Which issue does this PR close?

Closes #1448

Rationale for this change

We were reserving memory twice in native shuffle, resulting in excessive shuffling.

Here are results for TPC-H q9 with 3GB off-heap memory allocated:

Before (main branch)

        42.215020418167114,
        43.29415225982666,
        40.11583089828491,
        40.11201024055481,
        36.295708417892456

After

        33.27595615386963,
        30.483699560165405,
        31.230262994766235,
        31.28650164604187,
        31.095990657806396

What changes are included in this PR?

Stop double reserving memory.

How are these changes tested?

@andygrove andygrove marked this pull request as draft February 26, 2025 20:36
@codecov-commenter
Copy link

codecov-commenter commented Feb 26, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 58.56%. Comparing base (f09f8af) to head (c33900b).
Report is 53 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1452      +/-   ##
============================================
+ Coverage     56.12%   58.56%   +2.43%     
- Complexity      976     1015      +39     
============================================
  Files           119      122       +3     
  Lines         11743    12223     +480     
  Branches       2251     2295      +44     
============================================
+ Hits           6591     7158     +567     
+ Misses         4012     3913      -99     
- Partials       1140     1152      +12     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@andygrove andygrove force-pushed the shuffle-no-double-alloc branch from 47491a9 to 4cfe993 Compare February 27, 2025 21:17
num_output_partitions: usize,
runtime: Arc<RuntimeEnv>,
metrics: ShuffleRepartitionerMetrics,
reservation: MemoryReservation,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main change; removing the memory tracking in ShuffleRepartitioner because we already track the memory in each instance of PartitionedBuffer.

Comment on lines -475 to -509
let mut mem_diff = self
.append_rows_to_partition(
input.columns(),
&shuffled_partition_ids[start..end],
partition_id,
)
.await?;

if mem_diff > 0 {
let mem_increase = mem_diff as usize;

let try_grow = {
let mut mempool_timer = self.metrics.mempool_time.timer();
let result = self.reservation.try_grow(mem_increase);
mempool_timer.stop();
result
};

if try_grow.is_err() {
self.spill().await?;
let mut mempool_timer = self.metrics.mempool_time.timer();
self.reservation.free();
self.reservation.try_grow(mem_increase)?;
mempool_timer.stop();
mem_diff = 0;
}
}

if mem_diff < 0 {
let mem_used = self.reservation.size();
let mem_decrease = mem_used.min(-mem_diff as usize);
let mut mempool_timer = self.metrics.mempool_time.timer();
self.reservation.shrink(mem_decrease);
mempool_timer.stop();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need any of this memory accounting because it is already handled within append_rows_to_partition


mem_diff += self.active_slots_mem_size as isize;
}
Ok(mem_diff)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to return memory size here because we already reserved the memory in this method

}
start = end;
}
AppendRowStatus::MemDiff(Ok(mem_diff))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to return memory size here because all accounting already took place in the calls to allocate_active_builders and flush in this method

mempool_timer.stop();

mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize;
Ok(mem_diff)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to return memory size because memory accounting already happened in this method.

Comment on lines -1113 to +1069
// TODO reservation should not be zero because there are active builders again
assert_eq!(0, buffer.reservation.size());
assert_eq!(106496, buffer.reservation.size());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This demonstrates that the memory accounting is now more accurate

@andygrove andygrove marked this pull request as ready for review February 28, 2025 15:06
@andygrove
Copy link
Member Author

@mbutrovich This PR is ready for review now

Copy link
Contributor

@mbutrovich mbutrovich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find "active" vague and the mixing of "active rows" and "active slots" a bit confusing in PartitionBuffer, but that shouldn't stop this PR. Nicely done, @andygrove!

@andygrove
Copy link
Member Author

Thanks for the reviews @viirya and @mbutrovich. I agree that we could update some of the naming.

@andygrove andygrove merged commit b149983 into apache:main Feb 28, 2025
74 checks passed
@andygrove andygrove deleted the shuffle-no-double-alloc branch February 28, 2025 16:40
coderfender pushed a commit to coderfender/datafusion-comet that referenced this pull request Dec 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Native shuffle double allocates memory

4 participants