Skip to content

Commit

Permalink
[data] Update num_rows_per_file to min_rows_per_file (ray-project#49978)
Browse files Browse the repository at this point in the history
## Why are these changes needed?

num_rows_per_file makes it seem like there is "exact" semantics, whereas
the actual underlying implementation is more like "at least".

This updates the parameter name to reflect that, but we should double
check whether or not this is the best way to expose this.

Closes ray-project#45393

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Richard Liaw <[email protected]>
Co-authored-by: Balaji Veeramani <[email protected]>
Signed-off-by: Puyuan Yao <[email protected]>
  • Loading branch information
2 people authored and anyadontfly committed Feb 13, 2025
1 parent 3856df6 commit d2d3764
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 104 deletions.
11 changes: 7 additions & 4 deletions doc/source/data/saving-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -143,20 +143,23 @@ Changing the number of output files
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

When you call a write method, Ray Data writes your data to several files. To control the
number of output files, configure ``num_rows_per_file``.
number of output files, configure ``min_rows_per_write``.

.. note::

``num_rows_per_file`` is a hint, not a strict limit. Ray Data might write more or
fewer rows to each file.
``min_rows_per_write`` is a hint, not a strict limit. Ray Data might write more or
fewer rows to each file. Under the hood, if the number of rows per block is
larger than the specified value, Ray Data writes
the number of rows per block to each file.


.. testcode::

import os
import ray

ds = ray.data.read_csv("s3://anonymous@ray-example-data/iris.csv")
ds.write_csv("/tmp/few_files/", num_rows_per_file=75)
ds.write_csv("/tmp/few_files/", min_rows_per_write=75)

print(os.listdir("/tmp/few_files/"))

Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/_internal/datasource/parquet_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def __init__(
partition_cols: Optional[List[str]] = None,
arrow_parquet_args_fn: Optional[Callable[[], Dict[str, Any]]] = None,
arrow_parquet_args: Optional[Dict[str, Any]] = None,
num_rows_per_file: Optional[int] = None,
min_rows_per_file: Optional[int] = None,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
try_create_dir: bool = True,
open_stream_args: Optional[Dict[str, Any]] = None,
Expand All @@ -43,7 +43,7 @@ def __init__(

self.arrow_parquet_args_fn = arrow_parquet_args_fn
self.arrow_parquet_args = arrow_parquet_args
self.num_rows_per_file = num_rows_per_file
self.min_rows_per_file = min_rows_per_file
self.partition_cols = partition_cols

super().__init__(
Expand Down Expand Up @@ -168,5 +168,5 @@ def _write_partition_files(
writer.write_table(group_table)

@property
def num_rows_per_write(self) -> Optional[int]:
return self.num_rows_per_file
def min_rows_per_write(self) -> Optional[int]:
return self.min_rows_per_file
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def __init__(
):
if isinstance(datasink_or_legacy_datasource, Datasink):
min_rows_per_bundled_input = (
datasink_or_legacy_datasource.num_rows_per_write
datasink_or_legacy_datasource.min_rows_per_write
)
else:
min_rows_per_bundled_input = None
Expand Down
30 changes: 30 additions & 0 deletions python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,36 @@ def convert_bytes_to_human_readable_str(num_bytes: int) -> str:
return num_bytes_str


def _validate_rows_per_file_args(
*, num_rows_per_file: Optional[int] = None, min_rows_per_file: Optional[int] = None
) -> Optional[int]:
"""Helper method to validate and handle rows per file arguments.
Args:
num_rows_per_file: Deprecated parameter for number of rows per file
min_rows_per_file: New parameter for minimum rows per file
Returns:
The effective min_rows_per_file value to use
"""
if num_rows_per_file is not None:
import warnings

warnings.warn(
"`num_rows_per_file` is deprecated and will be removed in a future release. "
"Use `min_rows_per_file` instead.",
DeprecationWarning,
stacklevel=3,
)
if min_rows_per_file is not None:
raise ValueError(
"Cannot specify both `num_rows_per_file` and `min_rows_per_file`. "
"Use `min_rows_per_file` as `num_rows_per_file` is deprecated."
)
return num_rows_per_file
return min_rows_per_file


def is_nan(value):
try:
return isinstance(value, float) and np.isnan(value)
Expand Down
144 changes: 91 additions & 53 deletions python/ray/data/dataset.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion python/ray/data/datasource/datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def supports_distributed_writes(self) -> bool:
return True

@property
def num_rows_per_write(self) -> Optional[int]:
def min_rows_per_write(self) -> Optional[int]:
"""The target number of rows to pass to each :meth:`~ray.data.Datasink.write` call.
If ``None``, Ray Data passes a system-chosen number of rows.
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/datasource/file_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,11 +227,11 @@ def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
""" # noqa: E501

def __init__(
self, path, *, num_rows_per_file: Optional[int] = None, **file_datasink_kwargs
self, path, *, min_rows_per_file: Optional[int] = None, **file_datasink_kwargs
):
super().__init__(path, **file_datasink_kwargs)

self._num_rows_per_file = num_rows_per_file
self._min_rows_per_file = min_rows_per_file

def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
"""Write a block of data to a file.
Expand Down Expand Up @@ -262,5 +262,5 @@ def write_block_to_path():
)

@property
def num_rows_per_write(self) -> Optional[int]:
return self._num_rows_per_file
def min_rows_per_write(self) -> Optional[int]:
return self._min_rows_per_file
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -825,17 +825,17 @@ def test_csv_invalid_file_handler(ray_start_regular_shared, tmp_path):
)


@pytest.mark.parametrize("num_rows_per_file", [5, 10, 50])
def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file):
@pytest.mark.parametrize("min_rows_per_file", [5, 10, 50])
def test_write_min_rows_per_file(tmp_path, ray_start_regular_shared, min_rows_per_file):
ray.data.range(100, override_num_blocks=20).write_csv(
tmp_path, num_rows_per_file=num_rows_per_file
tmp_path, min_rows_per_file=min_rows_per_file
)

for filename in os.listdir(tmp_path):
with open(os.path.join(tmp_path, filename), "r") as file:
# Subtract 1 from the number of lines to account for the header.
num_rows_written = len(file.read().splitlines()) - 1
assert num_rows_written == num_rows_per_file
assert num_rows_written == min_rows_per_file


if __name__ == "__main__":
Expand Down
16 changes: 8 additions & 8 deletions python/ray/data/tests/test_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,21 @@ def get_node_id():
assert node_ids == {bar_node_id}


@pytest.mark.parametrize("num_rows_per_write", [5, 10, 50])
def test_num_rows_per_write(tmp_path, ray_start_regular_shared, num_rows_per_write):
@pytest.mark.parametrize("min_rows_per_write", [5, 10, 50])
def test_min_rows_per_write(tmp_path, ray_start_regular_shared, min_rows_per_write):
class MockDatasink(Datasink[None]):
def __init__(self, num_rows_per_write):
self._num_rows_per_write = num_rows_per_write
def __init__(self, min_rows_per_write):
self._min_rows_per_write = min_rows_per_write

def write(self, blocks: Iterable[Block], ctx: TaskContext) -> None:
assert sum(len(block) for block in blocks) == self._num_rows_per_write
assert sum(len(block) for block in blocks) == self._min_rows_per_write

@property
def num_rows_per_write(self):
return self._num_rows_per_write
def min_rows_per_write(self):
return self._min_rows_per_write

ray.data.range(100, override_num_blocks=20).write_datasink(
MockDatasink(num_rows_per_write)
MockDatasink(min_rows_per_write)
)


Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_file_datasink.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,8 @@ def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
assert os.path.isdir(path)


@pytest.mark.parametrize("num_rows_per_file", [5, 10, 50])
def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file):
@pytest.mark.parametrize("min_rows_per_file", [5, 10, 50])
def test_write_min_rows_per_file(tmp_path, ray_start_regular_shared, min_rows_per_file):
class MockFileDatasink(BlockBasedFileDatasink):
def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
for _ in range(block.num_rows()):
Expand All @@ -157,14 +157,14 @@ def write_block_to_file(self, block: BlockAccessor, file: "pyarrow.NativeFile"):
ds = ray.data.range(100, override_num_blocks=20)

ds.write_datasink(
MockFileDatasink(path=tmp_path, num_rows_per_file=num_rows_per_file)
MockFileDatasink(path=tmp_path, min_rows_per_file=min_rows_per_file)
)

num_rows_written_total = 0
for filename in os.listdir(tmp_path):
with open(os.path.join(tmp_path, filename), "r") as file:
num_rows_written = len(file.read().splitlines())
assert num_rows_written == num_rows_per_file
assert num_rows_written == min_rows_per_file
num_rows_written_total += num_rows_written

assert num_rows_written_total == 100
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,16 +645,16 @@ def test_json_read_across_blocks(ray_start_regular_shared, fs, data_path, endpoi
dsdf = ds.to_pandas()


@pytest.mark.parametrize("num_rows_per_file", [5, 10, 50])
def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file):
@pytest.mark.parametrize("min_rows_per_file", [5, 10, 50])
def test_write_min_rows_per_file(tmp_path, ray_start_regular_shared, min_rows_per_file):
ray.data.range(100, override_num_blocks=20).write_json(
tmp_path, num_rows_per_file=num_rows_per_file
tmp_path, min_rows_per_file=min_rows_per_file
)

for filename in os.listdir(tmp_path):
with open(os.path.join(tmp_path, filename), "r") as file:
num_rows_written = len(file.read().splitlines())
assert num_rows_written == num_rows_per_file
assert num_rows_written == min_rows_per_file


def test_mixed_gzipped_json_files(ray_start_regular_shared, tmp_path):
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_numpy.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,15 +292,15 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url):
np.testing.assert_equal(extract_values("data", ds.take(1)), [np.array([0])])


@pytest.mark.parametrize("num_rows_per_file", [5, 10, 50])
def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file):
@pytest.mark.parametrize("min_rows_per_file", [5, 10, 50])
def test_write_min_rows_per_file(tmp_path, ray_start_regular_shared, min_rows_per_file):
ray.data.range(100, override_num_blocks=20).write_numpy(
tmp_path, column="id", num_rows_per_file=num_rows_per_file
tmp_path, column="id", min_rows_per_file=min_rows_per_file
)

for filename in os.listdir(tmp_path):
array = np.load(os.path.join(tmp_path, filename))
assert len(array) == num_rows_per_file
assert len(array) == min_rows_per_file


if __name__ == "__main__":
Expand Down
10 changes: 5 additions & 5 deletions python/ray/data/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1210,17 +1210,17 @@ def test_parquet_bulk_columns(ray_start_regular_shared):
assert ds.columns() == ["variety"]


@pytest.mark.parametrize("num_rows_per_file", [5, 10, 50])
def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file):
@pytest.mark.parametrize("min_rows_per_file", [5, 10, 50])
def test_write_min_rows_per_file(tmp_path, ray_start_regular_shared, min_rows_per_file):
import pyarrow.parquet as pq

ray.data.range(100, override_num_blocks=20).write_parquet(
tmp_path, num_rows_per_file=num_rows_per_file
tmp_path, min_rows_per_file=min_rows_per_file
)

for filename in os.listdir(tmp_path):
table = pq.read_table(os.path.join(tmp_path, filename))
assert len(table) == num_rows_per_file
assert len(table) == min_rows_per_file


@pytest.mark.parametrize("shuffle", [True, False, "file"])
Expand Down Expand Up @@ -1393,7 +1393,7 @@ def test_write_auto_infer_nullable_fields(
ctx.target_max_block_size = 1
ds = ray.data.range(len(row_data)).map(lambda row: row_data[row["id"]])
# So we force writing to a single file.
ds.write_parquet(tmp_path, num_rows_per_file=2)
ds.write_parquet(tmp_path, min_rows_per_file=2)


def test_seed_file_shuffle(restore_data_context, tmp_path):
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_tfrecords.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,15 +742,15 @@ def test_read_with_invalid_schema(
)


@pytest.mark.parametrize("num_rows_per_file", [5, 10, 50])
def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file):
@pytest.mark.parametrize("min_rows_per_file", [5, 10, 50])
def test_write_min_rows_per_file(tmp_path, ray_start_regular_shared, min_rows_per_file):
ray.data.range(100, override_num_blocks=20).write_tfrecords(
tmp_path, num_rows_per_file=num_rows_per_file
tmp_path, min_rows_per_file=min_rows_per_file
)

for filename in os.listdir(tmp_path):
dataset = tf.data.TFRecordDataset(os.path.join(tmp_path, filename))
assert len(list(dataset)) == num_rows_per_file
assert len(list(dataset)) == min_rows_per_file


def read_tfrecords_with_tfx_read_override(paths, tfx_read=False, **read_opts):
Expand Down
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_webdataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,15 +269,15 @@ def test_webdataset_decoding(ray_start_2_cpus, tmp_path):
assert meta_json["e"]["img_filename"] == "for_test.jpg"


@pytest.mark.parametrize("num_rows_per_file", [5, 10, 50])
def test_write_num_rows_per_file(tmp_path, ray_start_regular_shared, num_rows_per_file):
@pytest.mark.parametrize("min_rows_per_file", [5, 10, 50])
def test_write_min_rows_per_file(tmp_path, ray_start_regular_shared, min_rows_per_file):
ray.data.from_items(
[{"id": str(i)} for i in range(100)], override_num_blocks=20
).write_webdataset(tmp_path, num_rows_per_file=num_rows_per_file)
).write_webdataset(tmp_path, min_rows_per_file=min_rows_per_file)

for filename in os.listdir(tmp_path):
dataset = wds.WebDataset(os.path.join(tmp_path, filename))
assert len(list(dataset)) == num_rows_per_file
assert len(list(dataset)) == min_rows_per_file


if __name__ == "__main__":
Expand Down

0 comments on commit d2d3764

Please sign in to comment.