Skip to content

Conversation

colin-ho
Copy link
Contributor

@colin-ho colin-ho commented Jul 29, 2025

Changes Made

Add broadcast joins to flotilla. The translation logic for determining join strategy + broadcast join threshold is copied from the old ray runner's physical plan translator.

The broadcast join works by simply materializing the partition refs for the broadcast side, then copying them to the each of the receiver sides task (aka broadcast), and then adding a hash join instruction.

Related Issues

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@github-actions github-actions bot added the feat label Jul 29, 2025
@colin-ho colin-ho marked this pull request as ready for review July 29, 2025 16:18
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements broadcast joins for the Flotilla distributed execution engine, a significant performance optimization for joins where one table is much smaller than the other. The implementation adds a new BroadcastJoinNode that materializes the smaller table and broadcasts it to all partitions of the larger table, avoiding expensive shuffling operations.

The core changes include:

  1. New BroadcastJoinNode: A complete implementation in broadcast_join.rs that handles the broadcast join execution pattern - materializing the broadcast side first, then copying it to each receiver task with hash join instructions.

  2. Join Strategy Logic: The determine_join_strategy method in translate.rs implements threshold-based decision making (10MB default) and partitioning compatibility checks to choose between Hash, Broadcast, Cross, and SortMerge strategies.

  3. Code Refactoring: Several files were updated to support the new functionality, including extracting reusable logic for creating in-memory scans from materialized outputs and renaming functions for clarity.

  4. Side Swapping Logic: Proper handling of join semantics where the broadcast/receiver sides may need to be swapped based on join type (e.g., LEFT joins require the left side to remain as receiver).

The implementation reuses logic from the existing Ray runner to ensure consistency in join strategy determination and maintains the distributed pipeline abstraction while enabling significant performance improvements for star-schema and similar query patterns.

Confidence score: 4/5

• This PR is generally safe to merge with solid implementation of broadcast joins following established patterns
• Score reflects the complexity of distributed join logic and potential for subtle edge cases in join semantics or memory management
• The broadcast_join.rs file needs the most attention due to its core role in the new functionality

7 files reviewed, 1 comment

Edit Code Review Bot Settings | Greptile

Comment on lines 140 to 146
// For broadcast joins, ensure that the left side of the join is the smaller side
let (smaller_size_bytes, left_is_larger) = if right_stats.size_bytes < left_stats.size_bytes
{
(right_stats.size_bytes, true)
} else {
(left_stats.size_bytes, false)
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: The comment says 'ensure that the left side of the join is the smaller side' but the logic actually identifies which side is larger/smaller without enforcing left to be smaller. Consider updating the comment for clarity.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of makes a decent point, maybe update the comment to say "determine the smaller side"

Copy link

codecov bot commented Jul 29, 2025

Codecov Report

❌ Patch coverage is 87.81726% with 48 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.27%. Comparing base (b5cb9cd) to head (1737033).
⚠️ Report is 3 commits behind head on main.

Files with missing lines Patch % Lines
...stributed/src/pipeline_node/join/broadcast_join.rs 75.88% 41 Missing ⚠️
...stributed/src/pipeline_node/join/translate_join.rs 96.25% 7 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4867      +/-   ##
==========================================
+ Coverage   79.26%   79.27%   +0.01%     
==========================================
  Files         908      910       +2     
  Lines      125851   126141     +290     
==========================================
+ Hits        99751   100003     +252     
- Misses      26100    26138      +38     
Files with missing lines Coverage Δ
src/daft-distributed/src/pipeline_node/gather.rs 91.00% <100.00%> (ø)
...ft-distributed/src/pipeline_node/join/hash_join.rs 75.15% <100.00%> (ø)
src/daft-distributed/src/pipeline_node/limit.rs 83.70% <100.00%> (ø)
src/daft-distributed/src/pipeline_node/mod.rs 89.91% <100.00%> (+0.60%) ⬆️
.../daft-distributed/src/pipeline_node/repartition.rs 89.93% <100.00%> (ø)
...rc/daft-distributed/src/pipeline_node/translate.rs 96.60% <100.00%> (+0.13%) ⬆️
src/daft-distributed/src/stage/stage_builder.rs 95.78% <100.00%> (+0.04%) ⬆️
...stributed/src/pipeline_node/join/translate_join.rs 96.25% <96.25%> (ø)
...stributed/src/pipeline_node/join/broadcast_join.rs 75.88% <75.88%> (ø)

... and 5 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@colin-ho colin-ho requested a review from srilman July 29, 2025 18:22
Copy link
Contributor

@srilman srilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should LGTM, just a couple of things

where
F: FnOnce(LocalPhysicalPlanRef) -> LocalPhysicalPlanRef + Send + Sync + 'static,
{
fn make_in_memory_scan_from_materialized_outputs(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still use this function anywhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah its used in broadcast join

join_plan,
config,
psets,
SchedulingStrategy::Spread,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bit of an aside, do you think having a scheduling strategy here makes sense any more? Cause for operators like this, are they able to use the scheduling strategy of the children task they use (in this case, the input task from receiver_input)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah i think it makes sense to just use the scheduling strategy of the child.

On this note, I want to refactor the logic around 'building' tasks. Right now the only api to do so is task.with_new_task which accepts a new swordfish task, but then there's still a lot of manual labor needed to make this new swordfish task in the first place, involving things like scheduling strategy, the context, task id, psets, etc.

Comment on lines 140 to 146
// For broadcast joins, ensure that the left side of the join is the smaller side
let (smaller_size_bytes, left_is_larger) = if right_stats.size_bytes < left_stats.size_bytes
{
(right_stats.size_bytes, true)
} else {
(left_stats.size_bytes, false)
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kind of makes a decent point, maybe update the comment to say "determine the smaller side"

@colin-ho colin-ho requested a review from srilman August 4, 2025 21:25
@colin-ho colin-ho merged commit 00386c9 into main Aug 8, 2025
50 checks passed
@colin-ho colin-ho deleted the colin/flotilla-broadcast-join branch August 8, 2025 21:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants