Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/ray/data/_internal/planner/plan_udf_map_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def plan_streaming_repartition_op(
data_context,
name=op.name,
compute_strategy=compute,
min_rows_per_bundle=op.target_num_rows_per_block,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: MapOperator Parameter Validation Issue

The min_rows_per_bundle parameter for MapOperator.create is assigned op.target_num_rows_per_block without validation. If op.target_num_rows_per_block is None, zero, or negative, it could cause unexpected behavior in MapOperator as a positive integer is likely expected.

Fix in Cursor Fix in Web

ray_remote_args=op._ray_remote_args,
ray_remote_args_fn=op._ray_remote_args_fn,
supports_fusion=False,
Expand Down
38 changes: 25 additions & 13 deletions python/ray/data/tests/test_repartition_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,42 +127,54 @@ def test_repartition_shuffle_arrow(


@pytest.mark.parametrize(
"total_rows,target_num_rows_per_block",
"total_rows,target_num_rows_per_block,expected_num_blocks",
[
(128, 1),
(128, 2),
(128, 4),
(128, 8),
(128, 128),
(128, 1, 128),
(128, 2, 64),
(128, 4, 32),
(128, 8, 16),
(128, 128, 1),
],
)
def test_repartition_target_num_rows_per_block(
ray_start_regular_shared_2_cpus,
total_rows,
target_num_rows_per_block,
expected_num_blocks,
disable_fallback_to_object_extension,
):
ds = ray.data.range(total_rows).repartition(
num_blocks = 16

# Each block is 8 ints
ds = ray.data.range(total_rows, override_num_blocks=num_blocks).repartition(
target_num_rows_per_block=target_num_rows_per_block,
)
rows_count = 0

num_blocks = 0
num_rows = 0
all_data = []

for ref_bundle in ds.iter_internal_ref_bundles():
block, block_metadata = (
ray.get(ref_bundle.blocks[0][0]),
ref_bundle.blocks[0][1],
)
assert block_metadata.num_rows <= target_num_rows_per_block
rows_count += block_metadata.num_rows

# NOTE: Because our block rows % target_num_rows_per_block == 0, we can
# assert equality here
assert block_metadata.num_rows == target_num_rows_per_block

num_blocks += 1
num_rows += block_metadata.num_rows

block_data = (
BlockAccessor.for_block(block).to_pandas().to_dict(orient="records")
)
all_data.extend(block_data)

assert rows_count == total_rows

# Verify total rows match
assert rows_count == total_rows
assert num_rows == total_rows
assert num_blocks == expected_num_blocks

# Verify data consistency
all_values = [row["id"] for row in all_data]
Expand Down