Skip to content

Conversation

ohbh
Copy link
Contributor

@ohbh ohbh commented Aug 18, 2025

Changes Made

Implemented distributed sort in the flotilla engine.

Approach:

  • materialize all inputs
  • sample inputs to find distribution of rows
  • repartition based on distribution
  • sort the final partitions

Special cases:

  • if the input has a single partition, simply sort the partition

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@ohbh ohbh requested review from srilman and colin-ho August 18, 2025 20:29
@github-actions github-actions bot added the feat label Aug 18, 2025
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements distributed sort functionality in the flotilla engine, replacing the previous broadcast-only sort approach with a scalable multi-stage distributed algorithm. The implementation follows a classic distributed sorting pattern with four main phases:

  1. Materialization: All input partitions are materialized to enable sampling
  2. Sampling: Data is sampled across partitions to determine value distribution and calculate optimal partition boundaries
  3. Range Repartitioning: Data is repartitioned based on the calculated boundaries so each partition contains a contiguous range of values
  4. Local Sorting: Each partition is sorted independently while maintaining global sort order

The changes span multiple components of the system:

  • Data structures: New RangeRepartitionConfig and RangeClusteringConfig structs in the logical plan layer store partition boundaries as RecordBatch objects
  • Execution layer: Modified SortNode removes single-partition restrictions and implements the full distributed algorithm with async execution
  • Repartitioning: Extended RepartitionSink and shuffle translation to support range-based partitioning
  • Python integration: Added Ray-based boundary calculation functions that concatenate materialized data and compute quantiles
  • Dependencies: Added necessary crate dependencies (daft-io, daft-micropartition, daft-recordbatch) and visibility changes to support the new functionality

The implementation includes an optimization for single-partition inputs that bypasses the distributed logic and directly sorts the partition. This distributed approach enables the flotilla engine to handle large-scale sorting operations efficiently across multiple workers while maintaining correctness guarantees.

Confidence score: 4/5

  • This PR implements complex distributed sorting logic with proper error handling and follows established patterns
  • Score reflects the complexity of distributed algorithms and potential for subtle race conditions or boundary edge cases
  • Pay close attention to the async execution flow in sort.rs and boundary calculation logic in the Python integration

12 files reviewed, 3 comments

Edit Code Review Bot Settings | Greptile

@ohbh ohbh force-pushed the oh/implement_distributed_sort branch from 3336b4c to 32320fa Compare August 18, 2025 22:10
Copy link

codecov bot commented Aug 19, 2025

Codecov Report

❌ Patch coverage is 8.30946% with 320 lines in your changes missing coverage. Please review.
✅ Project coverage is 76.40%. Comparing base (64b3fe1) to head (52c144f).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
src/daft-distributed/src/pipeline_node/sort.rs 0.00% 198 Missing ⚠️
src/daft-distributed/src/utils/transpose.rs 0.00% 47 Missing ⚠️
src/daft-logical-plan/src/partitioning.rs 0.00% 36 Missing ⚠️
src/daft-local-execution/src/sinks/repartition.rs 0.00% 18 Missing ⚠️
...aft-local-execution/src/intermediate_ops/sample.rs 57.14% 9 Missing ⚠️
...ed/src/pipeline_node/shuffles/translate_shuffle.rs 0.00% 3 Missing ⚠️
...tributed/src/pipeline_node/shuffles/repartition.rs 0.00% 2 Missing ⚠️
src/daft-local-execution/src/pipeline.rs 66.66% 2 Missing ⚠️
src/daft-local-plan/src/plan.rs 50.00% 2 Missing ⚠️
daft/runners/flotilla.py 90.90% 1 Missing ⚠️
... and 2 more
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4991      +/-   ##
==========================================
+ Coverage   76.11%   76.40%   +0.29%     
==========================================
  Files         933      934       +1     
  Lines      128478   128756     +278     
==========================================
+ Hits        97787    98376     +589     
+ Misses      30691    30380     -311     
Files with missing lines Coverage Δ
src/daft-distributed/src/python/mod.rs 30.39% <ø> (ø)
src/daft-distributed/src/stage/stage_builder.rs 95.23% <ø> (-0.22%) ⬇️
src/daft-local-plan/src/translate.rs 91.01% <100.00%> (ø)
src/daft-recordbatch/src/lib.rs 85.31% <ø> (ø)
daft/runners/flotilla.py 48.48% <90.90%> (+3.85%) ⬆️
src/daft-distributed/src/pipeline_node/sample.rs 0.00% <0.00%> (ø)
...rc/daft-distributed/src/pipeline_node/translate.rs 24.18% <0.00%> (+0.26%) ⬆️
...tributed/src/pipeline_node/shuffles/repartition.rs 47.32% <0.00%> (+8.57%) ⬆️
src/daft-local-execution/src/pipeline.rs 81.94% <66.66%> (-0.21%) ⬇️
src/daft-local-plan/src/plan.rs 69.02% <50.00%> (-0.10%) ⬇️
... and 6 more

... and 14 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@ohbh ohbh force-pushed the oh/implement_distributed_sort branch from 32320fa to 2420524 Compare August 19, 2025 18:33
@ohbh ohbh force-pushed the oh/implement_distributed_sort branch from a28d80c to 52c144f Compare August 20, 2025 00:08
Copy link
Contributor

@colin-ho colin-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome!

@ohbh ohbh enabled auto-merge (squash) August 20, 2025 01:59
@ohbh ohbh merged commit 12301c7 into main Aug 20, 2025
54 checks passed
@ohbh ohbh deleted the oh/implement_distributed_sort branch August 20, 2025 03:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants