-
Notifications
You must be signed in to change notification settings - Fork 7k
[Refactor] [Data]: test_limit_pushdown_conservative - spilt tests & fix ordering assumption #58746
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Refactor] [Data]: test_limit_pushdown_conservative - spilt tests & fix ordering assumption #58746
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request is a great refactoring effort, splitting a large, multi-case test function into ten smaller, more focused tests. This significantly improves the readability and maintainability of the test suite. The fixes for ordering assumptions by adding check_ordering=False are also correct and well-justified for parallel execution and union operations. I've added a few suggestions to further enhance the refactoring by replacing helper identity functions with more concise lambda expressions.
| def f1(x): | ||
| return x | ||
|
|
||
| ds = ray.data.range(100, override_num_blocks=100).map(f1).limit(1) | ||
| _check_valid_plan_and_result( | ||
| ds, "Read[ReadRange] -> Limit[limit=1] -> MapRows[Map(f1)]", [{"id": 0}] | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For conciseness and better readability, the f1 identity function can be replaced with an inline lambda x: x. This change also requires updating the expected plan string to use Map(<lambda>) instead of Map(f1).
| def f1(x): | |
| return x | |
| ds = ray.data.range(100, override_num_blocks=100).map(f1).limit(1) | |
| _check_valid_plan_and_result( | |
| ds, "Read[ReadRange] -> Limit[limit=1] -> MapRows[Map(f1)]", [{"id": 0}] | |
| ) | |
| ds = ray.data.range(100, override_num_blocks=100).map(lambda x: x).limit(1) | |
| _check_valid_plan_and_result( | |
| ds, "Read[ReadRange] -> Limit[limit=1] -> MapRows[Map(<lambda>)]", [{"id": 0}] | |
| ) |
| def f2(x): | ||
| return x | ||
|
|
||
| ds = ray.data.range(100, override_num_blocks=100).map_batches(f2).limit(1) | ||
| _check_valid_plan_and_result( | ||
| ds, | ||
| "Read[ReadRange] -> Limit[limit=1] -> MapBatches[MapBatches(f2)]", | ||
| [{"id": 0}], | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to other tests, the f2 identity function can be replaced with a lambda x: x for better code conciseness. The expected plan string should be updated accordingly.
ds = ray.data.range(100, override_num_blocks=100).map_batches(lambda x: x).limit(1)
_check_valid_plan_and_result(
ds,
"Read[ReadRange] -> Limit[limit=1] -> MapBatches[MapBatches(<lambda>)]",
[{"id": 0}],
)|
PTAL @owenowenisme @bveeramani |
| return x | ||
|
|
||
| # Test 1: Basic Limit -> Limit fusion (should still work) | ||
| def test_limit_pushdown_basic_limit_fusion(ray_start_regular_shared_2_cpus): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Except for the tests listed below, the Ray Data pipelines in this diff aren't guaranteed to produce rows in a specific output order. So, if you don't set check_ordering=False, the tests might raise false negatives.
Could you update the appropriate tests to use check_ordering=False?
List of tests with guaranteed order (okay to use check_ordering=True):
- test_limit_pushdown_union_with_sort
- test_limit_pushdown_complex_interweaved_operations
- test_limit_pushdown_stops_at_sort
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Missing check_ordering parameter in parametrized test (Bugbot Rules)
The test test_limit_pushdown_udf_modifying_row_count_with_map_batches calls _check_valid_plan_and_result without check_ordering=False, but the pipeline doesn't include a .sort() operation to guarantee row order. According to the reviewer's feedback, all tests without guaranteed order need check_ordering=False to avoid false negatives from non-deterministic row ordering in Ray Data pipelines.
python/ray/data/tests/test_execution_optimizer_limit_pushdown.py#L594-L599
|
@bveeramani Sorry for the late response. I walked through the code but still wasn’t able to conclude safely. I’ve gone ahead and updated them with |
8865ad2 to
5099f4c
Compare
Signed-off-by: ryankert01 <[email protected]>
Signed-off-by: ryankert01 <[email protected]>
5099f4c to
a7f5a06
Compare
bveeramani
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
|
Just enabled auto-merge. ty for the contribution! |
…ix ordering assumption (ray-project#58746) ## Description **Split multi-case test function** - `test_limit_pushdown_conservative` → 10 separate tests (basic fusion, limit fusion reversed, multiple limit fusion, maprows, mapbatches, filter, project, sort, complex interweaved operations, and between two map operators) **Fixed ordering assumptions** - Added `check_ordering=False` to union tests (blocks may interleave) - Added `check_ordering=False` to project test with `override_num_blocks` (parallel execution) ## Related issues Related to ray-project#58655 ## Additional information --------- Signed-off-by: ryankert01 <[email protected]> Signed-off-by: YK <[email protected]>
…ix ordering assumption (ray-project#58746) ## Description **Split multi-case test function** - `test_limit_pushdown_conservative` → 10 separate tests (basic fusion, limit fusion reversed, multiple limit fusion, maprows, mapbatches, filter, project, sort, complex interweaved operations, and between two map operators) **Fixed ordering assumptions** - Added `check_ordering=False` to union tests (blocks may interleave) - Added `check_ordering=False` to project test with `override_num_blocks` (parallel execution) ## Related issues Related to ray-project#58655 ## Additional information --------- Signed-off-by: ryankert01 <[email protected]>
Description
Split multi-case test function
test_limit_pushdown_conservative→ 10 separate tests (basic fusion, limit fusion reversed, multiple limit fusion, maprows, mapbatches, filter, project, sort, complex interweaved operations, and between two map operators)Fixed ordering assumptions
check_ordering=Falseto union tests (blocks may interleave)check_ordering=Falseto project test withoverride_num_blocks(parallel execution)Related issues
Related to #58655
Additional information