Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion doc/source/data/api/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,3 @@ Deprecated API
:toctree: doc/

Dataset.iter_tf_batches
Dataset.to_torch
131 changes: 0 additions & 131 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -5214,137 +5214,6 @@ def iter_tf_batches(
local_shuffle_seed=local_shuffle_seed,
)

@ConsumptionAPI(pattern="Time complexity:")
@Deprecated
def to_torch(
self,
*,
label_column: Optional[str] = None,
feature_columns: Optional[
Union[List[str], List[List[str]], Dict[str, List[str]]]
] = None,
label_column_dtype: Optional["torch.dtype"] = None,
feature_column_dtypes: Optional[
Union["torch.dtype", List["torch.dtype"], Dict[str, "torch.dtype"]]
] = None,
batch_size: int = 1,
prefetch_batches: int = 1,
drop_last: bool = False,
local_shuffle_buffer_size: Optional[int] = None,
local_shuffle_seed: Optional[int] = None,
unsqueeze_label_tensor: bool = True,
unsqueeze_feature_tensors: bool = True,
) -> "torch.utils.data.IterableDataset":
"""Return a
`Torch IterableDataset <https://pytorch.org/docs/stable/data.html#torch.utils.data.IterableDataset>`_
over this :class:`~ray.data.Dataset`.

This is only supported for datasets convertible to Arrow records.

It is recommended to use the returned ``IterableDataset`` directly
instead of passing it into a torch ``DataLoader``.

Each element in ``IterableDataset`` is a tuple consisting of 2
elements. The first item contains the feature tensor(s), and the
second item is the label tensor. Those can take on different
forms, depending on the specified arguments.

For the features tensor (N is the ``batch_size`` and n, m, k
are the number of features per tensor):

* If ``feature_columns`` is a ``List[str]``, the features is
a tensor of shape (N, n), with columns corresponding to
``feature_columns``

* If ``feature_columns`` is a ``List[List[str]]``, the features is
a list of tensors of shape [(N, m),...,(N, k)], with columns of each
tensor corresponding to the elements of ``feature_columns``

* If ``feature_columns`` is a ``Dict[str, List[str]]``, the features
is a dict of key-tensor pairs of shape
{key1: (N, m),..., keyN: (N, k)}, with columns of each
tensor corresponding to the value of ``feature_columns`` under the
key.

If ``unsqueeze_label_tensor=True`` (default), the label tensor is
of shape (N, 1). Otherwise, it is of shape (N,).
If ``label_column`` is specified as ``None``, then no column from the
``Dataset`` is treated as the label, and the output label tensor
is ``None``.

Note that you probably want to call :meth:`Dataset.split` on this dataset if
there are to be multiple Torch workers consuming the data.

Time complexity: O(1)

Args:
label_column: The name of the column used as the
label (second element of the output list). Can be None for
prediction, in which case the second element of returned
tuple will also be None.
feature_columns: The names of the columns
to use as the features. Can be a list of lists or
a dict of string-list pairs for multi-tensor output.
If ``None``, then use all columns except the label column as
the features.
label_column_dtype: The torch dtype to
use for the label column. If ``None``, then automatically infer
the dtype.
feature_column_dtypes: The dtypes to use for the feature
tensors. This should match the format of ``feature_columns``,
or be a single dtype, in which case it is applied to
all tensors. If ``None``, then automatically infer the dtype.
batch_size: How many samples per batch to yield at a time.
Defaults to 1.
prefetch_batches: The number of batches to fetch ahead of the current batch
to fetch. If set to greater than 0, a separate threadpool is used
to fetch the objects to the local node, format the batches, and apply
the collate_fn. Defaults to 1.
drop_last: Set to True to drop the last incomplete batch,
if the dataset size is not divisible by the batch size. If
False and the size of the stream is not divisible by the batch
size, then the last batch is smaller. Defaults to False.
local_shuffle_buffer_size: If non-None, the data is randomly shuffled
using a local in-memory shuffle buffer, and this value will serve as the
minimum number of rows that must be in the local in-memory shuffle
buffer in order to yield a batch. When there are no more rows to add to
the buffer, the remaining rows in the buffer are drained. This
buffer size must be greater than or equal to ``batch_size``, and
therefore ``batch_size`` must also be specified when using local
shuffling.
local_shuffle_seed: The seed to use for the local random shuffle.
unsqueeze_label_tensor: If set to True, the label tensor
is unsqueezed (reshaped to (N, 1)). Otherwise, it will
be left as is, that is (N, ). In general, regression loss
functions expect an unsqueezed tensor, while classification
loss functions expect a squeezed one. Defaults to True.
unsqueeze_feature_tensors: If set to True, the features tensors
are unsqueezed (reshaped to (N, 1)) before being concatenated into
the final features tensor. Otherwise, they are left as is, that is
(N, ). Defaults to True.

Returns:
A `Torch IterableDataset`_.
""" # noqa: E501
warnings.warn(
"`to_torch` is deprecated and will be removed after May 2025. Use "
"`iter_torch_batches` instead.",
DeprecationWarning,
)
return self.iterator().to_torch(
label_column=label_column,
feature_columns=feature_columns,
label_column_dtype=label_column_dtype,
feature_column_dtypes=feature_column_dtypes,
batch_size=batch_size,
prefetch_batches=prefetch_batches,
drop_last=drop_last,
local_shuffle_buffer_size=local_shuffle_buffer_size,
local_shuffle_seed=local_shuffle_seed,
unsqueeze_label_tensor=unsqueeze_label_tensor,
unsqueeze_feature_tensors=unsqueeze_feature_tensors,
)

@ConsumptionAPI
@PublicAPI(api_group=IOC_API_GROUP)
def to_tf(
Expand Down
13 changes: 0 additions & 13 deletions python/ray/data/tests/test_object_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,6 @@ def _all_executor_threads_exited():
wait_for_condition(_all_executor_threads_exited, timeout=10, retry_interval_ms=1000)


def check_to_torch_no_spill(ctx, dataset):
# Iterate over the dataset for 10 epochs to stress test that
# no spilling will happen.
max_epoch = 10
for _ in range(max_epoch):
for _ in dataset.to_torch(batch_size=None):
pass
meminfo = memory_summary(ctx.address_info["address"], stats_only=True)
assert "Spilled" not in meminfo, meminfo


def check_iter_torch_batches_no_spill(ctx, dataset):
# Iterate over the dataset for 10 epochs to stress test that
# no spilling will happen.
Expand Down Expand Up @@ -93,8 +82,6 @@ def test_torch_iteration(shutdown_only):
# The size of dataset is 500*(80*80*4)*8B, about 100MB.
ds = ray.data.range_tensor(500, shape=(80, 80, 4), override_num_blocks=100)

# to_torch
check_to_torch_no_spill(ctx, ds)
# iter_torch_batches
check_iter_torch_batches_no_spill(ctx, ds)

Expand Down
14 changes: 0 additions & 14 deletions python/ray/data/tests/test_raydp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pandas
import pytest
import raydp
import torch

import ray
from ray.data.tests.conftest import * # noqa
Expand Down Expand Up @@ -58,19 +57,6 @@ def test_from_spark_e2e(spark):
_check_usage_record(["FromArrow"])


def test_raydp_to_torch_iter(spark):
spark_df = spark.createDataFrame([(1, 0), (2, 0), (3, 1)], ["feature", "label"])
data_size = spark_df.count()
features = [r["feature"] for r in spark_df.take(data_size)]
features = torch.tensor(features).reshape(data_size, 1)
labels = [r["label"] for r in spark_df.take(data_size)]
labels = torch.tensor(labels).reshape(data_size, 1)
ds = ray.data.from_spark(spark_df)
dataset = ds.to_torch(label_column="label", batch_size=3)
data_features, data_labels = next(dataset.__iter__())
assert torch.equal(data_features, features) and torch.equal(data_labels, labels)


def test_to_pandas(spark):
df = spark.range(100)
ds = ray.data.from_spark(df)
Expand Down
Loading