Skip to content

Conversation

ohbh
Copy link
Contributor

@ohbh ohbh commented Aug 11, 2025

Changes Made

This PR adds support for DataFrame.into_batches() in the distributed (Flotilla/Ray) engine and wires it through the pipeline.

The node logic is as follows:

  1. materialize downstream pipeline w/ IntoBatches operator of batch_size
  2. collect materialized outputs from stream
  3. when num rows of materialized output >= batch_size*0.8 (batch_size_treshold), submit a new task w/ IntoBatches operator
  4. Send all remainder outputs in a downstream task

for (3/4), it's important to submit the next task with a IntoBatches operator, because that will force the local pipeline to combine all the RecordBatches in the give MicroPartition, which the logic done in the local execution op

let out = match input.concat_or_get(IOStatsContext::new("into_batches"))? {
Some(record_batch) => Arc::new(MicroPartition::new_loaded(
input.schema(),
Arc::new(vec![record_batch]),
None,
)),
None => Arc::new(MicroPartition::empty(Some(input.schema()))),
};
Ok((state, IntermediateOperatorResult::NeedMoreInput(Some(out))))

Related Issues

Native runner: #4935

Checklist

  • Documented in API Docs (if applicable)
  • Documented in User Guide (if applicable)
  • If adding a new documentation page, doc is added to docs/mkdocs.yml navigation
  • Documentation builds and is formatted properly (tag @/ccmao1130 for docs review)

@ohbh ohbh requested review from srilman and colin-ho August 11, 2025 23:44
@github-actions github-actions bot added the feat label Aug 11, 2025
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Greptile Summary

This PR implements support for DataFrame.into_batches() in the distributed (Flotilla/Ray) execution engine, achieving parity with the native execution engine. The implementation adds a new distributed pipeline node (IntoBatchesNode) that handles batching logic across distributed workers.

The changes span multiple layers of the distributed execution stack:

  1. Pipeline Node Infrastructure: Adds a new IntoBatchesNode in src/daft-distributed/src/pipeline_node/into_batches.rs that implements the distributed batching strategy. The node materializes downstream pipeline outputs, accumulates them until reaching the target batch size, then submits tasks that force local pipeline combination of RecordBatches within MicroPartitions.

  2. Translation Layer: Updates the pipeline translator in translate.rs to handle LogicalPlan::IntoBatches operations by creating IntoBatchesNode instances with appropriate configuration.

  3. Stage Builder: Moves IntoBatches from the list of unsupported operations to supported operations in stage_builder.rs, enabling the distributed engine to process logical plans containing batching operations.

  4. API Surface: Removes the runtime restriction in dataframe.py that previously prevented into_batches() from being used with the Ray runner.

  5. Test Coverage: Updates tests to run on both native and distributed engines, with adjusted assertions to accommodate the "best effort" nature of distributed batching where exact batch sizes cannot be guaranteed but minimum sizes are maintained.

The distributed implementation follows a streaming approach where it collects materialized outputs and creates new tasks when accumulated data reaches the batch size threshold, ensuring efficient memory usage and proper data distribution across workers.

Confidence score: 1/5

  • This PR contains a critical compilation bug that will prevent it from working
  • Score reflects a serious issue in the core IntoBatches implementation that uses a moved variable
  • Pay close attention to src/daft-distributed/src/pipeline_node/into_batches.rs line 124

6 files reviewed, 1 comment

Edit Code Review Bot Settings | Greptile

@ohbh ohbh force-pushed the oh/into_batches_distributed branch from c767693 to 8bd0b06 Compare August 12, 2025 03:31
Copy link

codecov bot commented Aug 12, 2025

Codecov Report

❌ Patch coverage is 82.05128% with 28 lines in your changes missing coverage. Please review.
✅ Project coverage is 79.57%. Comparing base (a05ad3c) to head (a191129).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
...daft-distributed/src/pipeline_node/into_batches.rs 80.95% 28 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@            Coverage Diff             @@
##             main    #4958      +/-   ##
==========================================
+ Coverage   78.03%   79.57%   +1.54%     
==========================================
  Files         916      917       +1     
  Lines      126917   126934      +17     
==========================================
+ Hits        99043   101014    +1971     
+ Misses      27874    25920    -1954     
Files with missing lines Coverage Δ
daft/dataframe/dataframe.py 86.75% <ø> (+0.06%) ⬆️
src/daft-distributed/src/pipeline_node/mod.rs 89.91% <ø> (ø)
...rc/daft-distributed/src/pipeline_node/translate.rs 97.10% <100.00%> (+0.53%) ⬆️
src/daft-distributed/src/stage/stage_builder.rs 95.50% <ø> (+2.09%) ⬆️
...daft-distributed/src/pipeline_node/into_batches.rs 80.95% <80.95%> (ø)

... and 36 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

@colin-ho colin-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good!

@ohbh ohbh force-pushed the oh/into_batches_distributed branch from 8bd0b06 to 39b8227 Compare August 12, 2025 17:24
Copy link
Contributor

@srilman srilman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job on this!

@ohbh ohbh force-pushed the oh/into_batches_distributed branch from b3c1597 to 1555bb0 Compare August 12, 2025 19:45
@ohbh ohbh force-pushed the oh/into_batches_distributed branch from 1555bb0 to a191129 Compare August 12, 2025 20:08
@ohbh ohbh enabled auto-merge (squash) August 12, 2025 21:01
@ohbh ohbh merged commit 7fc5d47 into main Aug 12, 2025
55 checks passed
@ohbh ohbh deleted the oh/into_batches_distributed branch August 12, 2025 21:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants