Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -3811,9 +3811,7 @@ def iterator(self) -> DataIterator:

@ConsumptionAPI
@PublicAPI(api_group=CD_API_GROUP)
def iter_rows(
self, *, prefetch_batches: int = 1, prefetch_blocks: int = 0
) -> Iterable[Dict[str, Any]]:
def iter_rows(self) -> Iterable[Dict[str, Any]]:
"""Return an iterable over the rows in this dataset.

Examples:
Expand All @@ -3826,18 +3824,10 @@ def iter_rows(

Time complexity: O(1)

Args:
prefetch_batches: The number of batches to prefetch ahead of the current
batch during the scan.
prefetch_blocks: This argument is deprecated. Use ``prefetch_batches``
instead.

Returns:
An iterable over the rows in this dataset.
"""
return self.iterator().iter_rows(
prefetch_batches=prefetch_batches, prefetch_blocks=prefetch_blocks
)
return self.iterator().iter_rows()

@ConsumptionAPI
@PublicAPI(api_group=CD_API_GROUP)
Expand Down
33 changes: 4 additions & 29 deletions python/ray/data/iterator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import abc
import time
import warnings
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -188,9 +187,7 @@ def _create_iterator() -> Iterator[DataBatch]:
def _get_dataset_tag(self) -> str:
return "unknown_dataset"

def iter_rows(
self, *, prefetch_batches: int = 1, prefetch_blocks: int = 0
) -> Iterable[Dict[str, Any]]:
def iter_rows(self) -> Iterable[Dict[str, Any]]:
"""Return a local row iterable over the dataset.

If the dataset is a tabular dataset (Arrow/Pandas blocks), dicts
Expand All @@ -205,34 +202,12 @@ def iter_rows(

Time complexity: O(1)

Args:
prefetch_batches: The number of batches to prefetch ahead of the current
batch during the scan.
prefetch_blocks: This argument is deprecated. Use ``prefetch_batches``
instead.

Returns:
An iterable over rows of the dataset.
"""
iter_batch_args = {
"batch_size": None,
"batch_format": None,
"prefetch_batches": prefetch_batches,
}
if prefetch_blocks > 0:
warnings.warn(
"`prefetch_blocks` is deprecated in Ray 2.10. Use "
"the `prefetch_batches` parameter to specify the amount of prefetching "
"in terms of batches instead of blocks.",
DeprecationWarning,
)
iter_batch_args["prefetch_batches"] = prefetch_blocks
if prefetch_batches != 1:
warnings.warn(
"`prefetch_batches` is deprecated in Ray 2.12.", DeprecationWarning
)

batch_iterable = self.iter_batches(**iter_batch_args)
batch_iterable = self.iter_batches(
batch_size=None, batch_format=None, prefetch_batches=1
)

def _wrapped_iterator():
for batch in batch_iterable:
Expand Down
5 changes: 0 additions & 5 deletions python/ray/data/tests/test_consumption.py
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,6 @@ def to_pylist(table):
assert isinstance(row, dict)
assert row == df_row.to_dict()

# Prefetch.
for row, t_row in zip(ds.iter_rows(prefetch_batches=1), to_pylist(t)):
assert isinstance(row, dict)
assert row == t_row


def test_iter_batches_basic(ray_start_regular_shared):
df1 = pd.DataFrame({"one": [1, 2, 3], "two": [2, 3, 4]})
Expand Down