-
Notifications
You must be signed in to change notification settings - Fork 7k
[data] Streaming executor fixes #2 #32759
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
edc51bd
61f4d6d
a33a943
36ebe52
ce6763e
0e2c29e
f2b6ed0
bb6c5c4
540fe79
edad7d0
60cc079
a3d3980
001579c
8aeed6c
7a9a49b
ef97167
6f0563c
bcec4d6
ddef4e5
fc9a175
f0e90b7
999d1de
d8159e3
d81cd02
bc831bb
c444395
642da6f
f713f2f
d416a73
da5acee
ab64cb6
7a5d5e3
47010ca
b9fcb5f
bfd724d
9baa73a
e1d0df5
94a88ee
c670a3b
2c608f4
c5a6aa9
6c82ad2
4cbcbae
82bc7a7
7e31d52
c4b7f42
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1369,17 +1369,26 @@ def test_count_lazy(ray_start_regular_shared): | |
|
|
||
| def test_lazy_loading_exponential_rampup(ray_start_regular_shared): | ||
| ds = ray.data.range(100, parallelism=20) | ||
| assert ds._plan.execute()._num_computed() == 0 | ||
|
|
||
| def check_num_computed(expected): | ||
| if ray.data.context.DatasetContext.get_current().use_streaming_executor: | ||
| # In streaing executor, ds.take() will not invoke partial execution | ||
| # in LazyBlocklist. | ||
| assert ds._plan.execute()._num_computed() == 0 | ||
| else: | ||
| assert ds._plan.execute()._num_computed() == expected | ||
|
|
||
| check_num_computed(0) | ||
| assert ds.take(10) == list(range(10)) | ||
| assert ds._plan.execute()._num_computed() == 2 | ||
| check_num_computed(2) | ||
| assert ds.take(20) == list(range(20)) | ||
| assert ds._plan.execute()._num_computed() == 4 | ||
| check_num_computed(4) | ||
| assert ds.take(30) == list(range(30)) | ||
| assert ds._plan.execute()._num_computed() == 8 | ||
| check_num_computed(8) | ||
| assert ds.take(50) == list(range(50)) | ||
| assert ds._plan.execute()._num_computed() == 16 | ||
| check_num_computed(16) | ||
| assert ds.take(100) == list(range(100)) | ||
| assert ds._plan.execute()._num_computed() == 20 | ||
| check_num_computed(20) | ||
|
|
||
|
|
||
| def test_dataset_repr(ray_start_regular_shared): | ||
|
|
@@ -1696,7 +1705,14 @@ def to_pylist(table): | |
| # Default ArrowRows. | ||
| for row, t_row in zip(ds.iter_rows(), to_pylist(t)): | ||
| assert isinstance(row, TableRow) | ||
| assert isinstance(row, ArrowRow) | ||
| # In streaming, we set batch_format to "default" because calling | ||
| # ds.dataset_format() will still invoke bulk execution and we want | ||
| # to avoid that. As a result, it's receiving PandasRow (the defaut | ||
| # batch format). | ||
| if ray.data.context.DatasetContext.get_current().use_streaming_executor: | ||
| assert isinstance(row, PandasRow) | ||
| else: | ||
| assert isinstance(row, ArrowRow) | ||
| assert row == t_row | ||
|
|
||
| # PandasRows after conversion. | ||
|
|
@@ -1710,7 +1726,14 @@ def to_pylist(table): | |
| # Prefetch. | ||
| for row, t_row in zip(ds.iter_rows(prefetch_blocks=1), to_pylist(t)): | ||
| assert isinstance(row, TableRow) | ||
| assert isinstance(row, ArrowRow) | ||
| # In streaming, we set batch_format to "default" because calling | ||
| # ds.dataset_format() will still invoke bulk execution and we want | ||
| # to avoid that. As a result, it's receiving PandasRow (the defaut | ||
| # batch format). | ||
| if ray.data.context.DatasetContext.get_current().use_streaming_executor: | ||
| assert isinstance(row, PandasRow) | ||
| else: | ||
| assert isinstance(row, ArrowRow) | ||
| assert row == t_row | ||
|
|
||
|
|
||
|
|
@@ -2181,7 +2204,12 @@ def test_lazy_loading_iter_batches_exponential_rampup(ray_start_regular_shared): | |
| ds = ray.data.range(32, parallelism=8) | ||
| expected_num_blocks = [1, 2, 4, 4, 8, 8, 8, 8] | ||
| for _, expected in zip(ds.iter_batches(batch_size=None), expected_num_blocks): | ||
| assert ds._plan.execute()._num_computed() == expected | ||
| if ray.data.context.DatasetContext.get_current().use_streaming_executor: | ||
| # In streaming execution of ds.iter_batches(), there is no partial | ||
| # execution so _num_computed() in LazyBlocklist is 0. | ||
| assert ds._plan.execute()._num_computed() == 0 | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is basically disabling this test. How about we just force it to use bulk executor instead?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This assertion still has value as it checks that streaming execution didn't run LazyBlocklist. |
||
| else: | ||
| assert ds._plan.execute()._num_computed() == expected | ||
|
|
||
|
|
||
| def test_add_column(ray_start_regular_shared): | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.