diff --git a/ci/lint/pydoclint-baseline.txt b/ci/lint/pydoclint-baseline.txt index fa7ba88f5314..ce73c467d772 100644 --- a/ci/lint/pydoclint-baseline.txt +++ b/ci/lint/pydoclint-baseline.txt @@ -1317,7 +1317,6 @@ python/ray/data/read_api.py DOC101: Function `read_bigquery`: Docstring contains fewer arguments than in function signature. DOC103: Function `read_bigquery`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [query: Optional[str]]. DOC103: Function `read_parquet`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**arrow_parquet_args: ]. Arguments in the docstring but not in the function signature: [arrow_parquet_args: ]. - DOC103: Function `read_parquet_bulk`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**arrow_parquet_args: ]. Arguments in the docstring but not in the function signature: [arrow_parquet_args: ]. DOC103: Function `read_json`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**arrow_json_args: ]. Arguments in the docstring but not in the function signature: [arrow_json_args: ]. DOC103: Function `read_csv`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**arrow_csv_args: ]. Arguments in the docstring but not in the function signature: [arrow_csv_args: ]. DOC101: Function `read_text`: Docstring contains fewer arguments than in function signature. diff --git a/doc/source/data/api/input_output.rst b/doc/source/data/api/input_output.rst index e8dcca1e13d4..d63279616785 100644 --- a/doc/source/data/api/input_output.rst +++ b/doc/source/data/api/input_output.rst @@ -32,7 +32,6 @@ Parquet :toctree: doc/ read_parquet - read_parquet_bulk Dataset.write_parquet CSV diff --git a/python/ray/data/__init__.py b/python/ray/data/__init__.py index 76a07548d57c..b458afe20bfa 100644 --- a/python/ray/data/__init__.py +++ b/python/ray/data/__init__.py @@ -66,7 +66,6 @@ read_mongo, read_numpy, read_parquet, - read_parquet_bulk, read_snowflake, read_sql, read_text, @@ -175,7 +174,6 @@ "read_numpy", "read_mongo", "read_parquet", - "read_parquet_bulk", "read_snowflake", "read_sql", "read_tfrecords", diff --git a/python/ray/data/_internal/datasource/parquet_bulk_datasource.py b/python/ray/data/_internal/datasource/parquet_bulk_datasource.py deleted file mode 100644 index 72d2015713ea..000000000000 --- a/python/ray/data/_internal/datasource/parquet_bulk_datasource.py +++ /dev/null @@ -1,51 +0,0 @@ -import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union - -from ray.data.datasource.file_based_datasource import FileBasedDatasource - -if TYPE_CHECKING: - import pyarrow - - -logger = logging.getLogger(__name__) - - -class ParquetBulkDatasource(FileBasedDatasource): - """Minimal Parquet datasource, for reading and writing Parquet files.""" - - _FILE_EXTENSIONS = ["parquet"] - - def __init__( - self, - paths: Union[str, List[str]], - read_table_args: Optional[Dict[str, Any]] = None, - **file_based_datasource_kwargs, - ): - super().__init__(paths, **file_based_datasource_kwargs) - - if read_table_args is None: - read_table_args = {} - - self.read_table_args = read_table_args - - def get_name(self): - """Return a human-readable name for this datasource. - This will be used as the names of the read tasks. - Note: overrides the base `FileBasedDatasource` method. - """ - return "ParquetBulk" - - def _read_stream(self, f: "pyarrow.NativeFile", path: str): - import pyarrow.parquet as pq - - use_threads = self.read_table_args.pop("use_threads", False) - yield pq.read_table(f, use_threads=use_threads, **self.read_table_args) - - def _open_input_source( - self, - filesystem: "pyarrow.fs.FileSystem", - path: str, - **open_args, - ) -> "pyarrow.NativeFile": - # Parquet requires `open_input_file` due to random access reads - return filesystem.open_input_file(path, **open_args) diff --git a/python/ray/data/_internal/datasource/parquet_datasource.py b/python/ray/data/_internal/datasource/parquet_datasource.py index 8731ba71fa2b..0f68599330ea 100644 --- a/python/ray/data/_internal/datasource/parquet_datasource.py +++ b/python/ray/data/_internal/datasource/parquet_datasource.py @@ -281,10 +281,9 @@ def combine_predicates( class ParquetDatasource(Datasource): """Parquet datasource, for reading and writing Parquet files. - The primary difference from ParquetBulkDatasource is that this uses - PyArrow's `ParquetDataset` abstraction for dataset reads, and thus offers - automatic Arrow dataset schema inference and row count collection at the - cost of some potential performance and/or compatibility penalties. + This implementation uses PyArrow's `ParquetDataset` abstraction for dataset reads, + and thus offers automatic Arrow dataset schema inference and row count collection at + the cost of some potential performance and/or compatibility penalties. """ _FILE_EXTENSIONS = ["parquet"] diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 9467c7644c38..19dd640514e3 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -50,7 +50,6 @@ from ray.data._internal.datasource.mcap_datasource import MCAPDatasource, TimeRange from ray.data._internal.datasource.mongo_datasource import MongoDatasource from ray.data._internal.datasource.numpy_datasource import NumpyDatasource -from ray.data._internal.datasource.parquet_bulk_datasource import ParquetBulkDatasource from ray.data._internal.datasource.parquet_datasource import ParquetDatasource from ray.data._internal.datasource.range_datasource import RangeDatasource from ray.data._internal.datasource.sql_datasource import SQLDatasource @@ -100,12 +99,11 @@ ) from ray.data.datasource.file_meta_provider import ( DefaultFileMetadataProvider, - FastFileMetadataProvider, FileMetadataProvider, ) from ray.data.datasource.partitioning import Partitioning from ray.types import ObjectRef -from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI +from ray.util.annotations import DeveloperAPI, PublicAPI from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy if TYPE_CHECKING: @@ -1224,155 +1222,6 @@ class string ) -@Deprecated -def read_parquet_bulk( - paths: Union[str, List[str]], - *, - filesystem: Optional["pyarrow.fs.FileSystem"] = None, - columns: Optional[List[str]] = None, - parallelism: int = -1, - num_cpus: Optional[float] = None, - num_gpus: Optional[float] = None, - memory: Optional[float] = None, - ray_remote_args: Dict[str, Any] = None, - arrow_open_file_args: Optional[Dict[str, Any]] = None, - tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None, - meta_provider: Optional[BaseFileMetadataProvider] = None, - partition_filter: Optional[PathPartitionFilter] = None, - shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None, - include_paths: bool = False, - file_extensions: Optional[List[str]] = ParquetBulkDatasource._FILE_EXTENSIONS, - concurrency: Optional[int] = None, - override_num_blocks: Optional[int] = None, - **arrow_parquet_args, -) -> Dataset: - """Create :class:`~ray.data.Dataset` from parquet files without reading metadata. - - Use :meth:`~ray.data.read_parquet` for most cases. - - Use :meth:`~ray.data.read_parquet_bulk` if all the provided paths point to files - and metadata fetching using :meth:`~ray.data.read_parquet` takes too long or the - parquet files do not all have a unified schema. - - Performance slowdowns are possible when using this method with parquet files that - are very large. - - .. warning:: - - Only provide file paths as input (i.e., no directory paths). An - OSError is raised if one or more paths point to directories. If your - use-case requires directory paths, use :meth:`~ray.data.read_parquet` - instead. - - Examples: - Read multiple local files. You should always provide only input file paths - (i.e. no directory paths) when known to minimize read latency. - - >>> ray.data.read_parquet_bulk( # doctest: +SKIP - ... ["/path/to/file1", "/path/to/file2"]) - - Args: - paths: A single file path or a list of file paths. - filesystem: The PyArrow filesystem - implementation to read from. These filesystems are - specified in the - `PyArrow docs `_. - Specify this parameter if you need to provide specific configurations to - the filesystem. By default, the filesystem is automatically selected based - on the scheme of the paths. For example, if the path begins with ``s3://``, - the `S3FileSystem` is used. - columns: A list of column names to read. Only the - specified columns are read during the file scan. - parallelism: This argument is deprecated. Use ``override_num_blocks`` argument. - num_cpus: The number of CPUs to reserve for each parallel read worker. - num_gpus: The number of GPUs to reserve for each parallel read worker. For - example, specify `num_gpus=1` to request 1 GPU for each parallel read - worker. - memory: The heap memory in bytes to reserve for each parallel read worker. - ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks. - arrow_open_file_args: kwargs passed to - `pyarrow.fs.FileSystem.open_input_file `_. - when opening input files to read. - tensor_column_schema: A dict of column name to PyArrow dtype and shape - mappings for converting a Parquet column containing serialized - tensors (ndarrays) as their elements to PyArrow tensors. This function - assumes that the tensors are serialized in the raw - NumPy array format in C-contiguous order (e.g. via - `arr.tobytes()`). - meta_provider: [Deprecated] A :ref:`file metadata provider `. - Custom metadata providers may be able to resolve file metadata more quickly - and/or accurately. In most cases, you do not need to set this. If ``None``, - this function uses a system-chosen implementation. - partition_filter: A - :class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use - with a custom callback to read only selected partitions of a dataset. - By default, this filters out any file paths whose file extension does not - match "*.parquet*". - shuffle: If setting to "files", randomly shuffle input files order before read. - If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to - shuffle the input files. Defaults to not shuffle with ``None``. - arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full - set of arguments, see - the `PyArrow API `_ - include_paths: If ``True``, include the path to each file. File paths are - stored in the ``'path'`` column. - file_extensions: A list of file extensions to filter files by. - concurrency: The maximum number of Ray tasks to run concurrently. Set this - to control number of tasks to run concurrently. This doesn't change the - total number of tasks run or the total number of output blocks. By default, - concurrency is dynamically decided based on the available resources. - override_num_blocks: Override the number of output blocks from all read tasks. - By default, the number of output blocks is dynamically decided based on - input data size and available resources. You shouldn't manually set this - value in most cases. - - Returns: - :class:`~ray.data.Dataset` producing records read from the specified paths. - """ - _emit_meta_provider_deprecation_warning(meta_provider) - - warnings.warn( - "`read_parquet_bulk` is deprecated and will be removed after May 2025. Use " - "`read_parquet` instead.", - DeprecationWarning, - ) - - if meta_provider is None: - meta_provider = FastFileMetadataProvider() - read_table_args = _resolve_parquet_args( - tensor_column_schema, - **arrow_parquet_args, - ) - if columns is not None: - read_table_args["columns"] = columns - - datasource = ParquetBulkDatasource( - paths, - read_table_args=read_table_args, - filesystem=filesystem, - open_stream_args=arrow_open_file_args, - meta_provider=meta_provider, - partition_filter=partition_filter, - shuffle=shuffle, - include_paths=include_paths, - file_extensions=file_extensions, - ) - return read_datasource( - datasource, - num_cpus=num_cpus, - num_gpus=num_gpus, - memory=memory, - parallelism=parallelism, - ray_remote_args=ray_remote_args, - concurrency=concurrency, - override_num_blocks=override_num_blocks, - ) - - @PublicAPI def read_json( paths: Union[str, List[str]], diff --git a/python/ray/data/tests/test_parquet.py b/python/ray/data/tests/test_parquet.py index 7b512732182c..bd34bf0c2782 100644 --- a/python/ray/data/tests/test_parquet.py +++ b/python/ray/data/tests/test_parquet.py @@ -23,7 +23,6 @@ get_arrow_extension_fixed_shape_tensor_types, ) from ray.data import FileShuffleConfig, Schema -from ray.data._internal.datasource.parquet_bulk_datasource import ParquetBulkDatasource from ray.data._internal.datasource.parquet_datasource import ( ParquetDatasource, ) @@ -33,7 +32,6 @@ from ray.data._internal.util import rows_same from ray.data.block import BlockAccessor from ray.data.context import DataContext -from ray.data.datasource import DefaultFileMetadataProvider from ray.data.datasource.partitioning import Partitioning, PathPartitionFilter from ray.data.datasource.path_util import _unwrap_protocol from ray.data.tests.conftest import * # noqa @@ -233,153 +231,6 @@ def test_parquet_read_random_shuffle( assert not all(all_rows_matched) -@pytest.mark.parametrize( - "fs,data_path", - [ - (None, lazy_fixture("local_path")), - (lazy_fixture("local_fs"), lazy_fixture("local_path")), - (lazy_fixture("s3_fs"), lazy_fixture("s3_path")), - ( - lazy_fixture("s3_fs_with_space"), - lazy_fixture("s3_path_with_space"), - ), # Path contains space. - ( - lazy_fixture("s3_fs_with_anonymous_crendential"), - lazy_fixture("s3_path_with_anonymous_crendential"), - ), - ], -) -def test_parquet_read_bulk( - ray_start_regular_shared, fs, data_path, target_max_block_size_infinite_or_default -): - df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) - table = pa.Table.from_pandas(df1) - setup_data_path = _unwrap_protocol(data_path) - path1 = os.path.join(setup_data_path, "test1.parquet") - pq.write_table(table, path1, filesystem=fs) - df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) - table = pa.Table.from_pandas(df2) - path2 = os.path.join(setup_data_path, "test2.parquet") - pq.write_table(table, path2, filesystem=fs) - - # Expect directory path expansion to fail due to default format-based path - # filtering: The filter will not match any of the files. - with pytest.raises(ValueError): - ray.data.read_parquet_bulk(data_path, filesystem=fs) - - # Expect directory path expansion to fail with OS error if default format-based path - # filtering is turned off. - with pytest.raises(OSError): - ds = ray.data.read_parquet_bulk(data_path, filesystem=fs, file_extensions=None) - ds.schema() - - paths = [path1, path2] - ds = ray.data.read_parquet_bulk(paths, filesystem=fs) - - # Expect to lazily compute all metadata correctly. - input_files = ds.input_files() - assert len(input_files) == 2, input_files - assert "test1.parquet" in str(input_files) - assert "test2.parquet" in str(input_files) - assert not ds._plan.has_started_execution - assert ds.schema() == Schema(pa.schema({"one": pa.int64(), "two": pa.string()})) - - # Schema isn't available, so we do a partial read. - assert not ds._plan.has_computed_output() - - # Forces a data read. - values = [[s["one"], s["two"]] for s in ds.take()] - assert sorted(values) == [ - [1, "a"], - [2, "b"], - [3, "c"], - [4, "e"], - [5, "f"], - [6, "g"], - ] - - # Add a file with a non-matching file extension. This file should be ignored. - txt_path = os.path.join(data_path, "foo.txt") - txt_df = pd.DataFrame({"foobar": [4, 5, 6]}) - txt_table = pa.Table.from_pandas(txt_df) - pq.write_table(txt_table, _unwrap_protocol(txt_path), filesystem=fs) - - ds = ray.data.read_parquet_bulk(paths + [txt_path], filesystem=fs) - assert ds._plan.initial_num_blocks() == 2 - assert not ds._plan.has_started_execution - - # Forces a data read. - values = [[s["one"], s["two"]] for s in ds.take()] - assert sorted(values) == [ - [1, "a"], - [2, "b"], - [3, "c"], - [4, "e"], - [5, "f"], - [6, "g"], - ] - - -@pytest.mark.parametrize( - "fs,data_path", - [ - (None, lazy_fixture("local_path")), - (lazy_fixture("local_fs"), lazy_fixture("local_path")), - (lazy_fixture("s3_fs"), lazy_fixture("s3_path")), - ( - lazy_fixture("s3_fs_with_space"), - lazy_fixture("s3_path_with_space"), - ), # Path contains space. - ( - lazy_fixture("s3_fs_with_anonymous_crendential"), - lazy_fixture("s3_path_with_anonymous_crendential"), - ), - ], -) -def test_parquet_read_bulk_meta_provider( - ray_start_regular_shared, fs, data_path, target_max_block_size_infinite_or_default -): - df1 = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]}) - table = pa.Table.from_pandas(df1) - setup_data_path = _unwrap_protocol(data_path) - path1 = os.path.join(setup_data_path, "test1.parquet") - pq.write_table(table, path1, filesystem=fs) - df2 = pd.DataFrame({"one": [4, 5, 6], "two": ["e", "f", "g"]}) - table = pa.Table.from_pandas(df2) - path2 = os.path.join(setup_data_path, "test2.parquet") - pq.write_table(table, path2, filesystem=fs) - - # Expect directory path expansion to succeed with the default metadata provider. - ds = ray.data.read_parquet_bulk( - data_path, - filesystem=fs, - meta_provider=DefaultFileMetadataProvider(), - ) - - # Expect to lazily compute all metadata correctly. - input_files = ds.input_files() - assert len(input_files) == 2, input_files - assert "test1.parquet" in str(input_files) - assert "test2.parquet" in str(input_files) - assert not ds._plan.has_started_execution - - assert ds.count() == 6 - assert ds.size_bytes() > 0 - assert ds.schema() == Schema(pa.schema({"one": pa.int64(), "two": pa.string()})) - assert not ds._plan.has_started_execution - - # Forces a data read. - values = [[s["one"], s["two"]] for s in ds.take()] - assert sorted(values) == [ - [1, "a"], - [2, "b"], - [3, "c"], - [4, "e"], - [5, "f"], - [6, "g"], - ] - - @pytest.mark.parametrize( "fs,data_path", [ @@ -1118,7 +969,6 @@ def test_parquet_datasource_names(ray_start_regular_shared, tmp_path): path = os.path.join(tmp_path, "data.parquet") df.to_parquet(path) - assert ParquetBulkDatasource(path).get_name() == "ParquetBulk" assert ParquetDatasource(path).get_name() == "Parquet" @@ -1219,14 +1069,6 @@ def get_node_id(): assert set(locations) == {node1_id, node2_id}, set(locations) -def test_parquet_bulk_columns( - ray_start_regular_shared, target_max_block_size_infinite_or_default -): - ds = ray.data.read_parquet_bulk("example://iris.parquet", columns=["variety"]) - - assert ds.columns() == ["variety"] - - @pytest.mark.parametrize("shuffle", [True, False, "file"]) def test_invalid_shuffle_arg_raises_error( ray_start_regular_shared, shuffle, target_max_block_size_infinite_or_default