Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ci/lint/pydoclint-baseline.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,6 @@ python/ray/data/read_api.py
DOC101: Function `read_bigquery`: Docstring contains fewer arguments than in function signature.
DOC103: Function `read_bigquery`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [query: Optional[str]].
DOC103: Function `read_parquet`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**arrow_parquet_args: ]. Arguments in the docstring but not in the function signature: [arrow_parquet_args: ].
DOC103: Function `read_parquet_bulk`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**arrow_parquet_args: ]. Arguments in the docstring but not in the function signature: [arrow_parquet_args: ].
DOC103: Function `read_json`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**arrow_json_args: ]. Arguments in the docstring but not in the function signature: [arrow_json_args: ].
DOC103: Function `read_csv`: Docstring arguments are different from function arguments. (Or could be other formatting issues: https://jsh9.github.io/pydoclint/violation_codes.html#notes-on-doc103 ). Arguments in the function signature but not in the docstring: [**arrow_csv_args: ]. Arguments in the docstring but not in the function signature: [arrow_csv_args: ].
DOC101: Function `read_text`: Docstring contains fewer arguments than in function signature.
Expand Down
1 change: 0 additions & 1 deletion doc/source/data/api/input_output.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ Parquet
:toctree: doc/

read_parquet
read_parquet_bulk
Dataset.write_parquet

CSV
Expand Down
2 changes: 0 additions & 2 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
read_mongo,
read_numpy,
read_parquet,
read_parquet_bulk,
read_snowflake,
read_sql,
read_text,
Expand Down Expand Up @@ -175,7 +174,6 @@
"read_numpy",
"read_mongo",
"read_parquet",
"read_parquet_bulk",
"read_snowflake",
"read_sql",
"read_tfrecords",
Expand Down
51 changes: 0 additions & 51 deletions python/ray/data/_internal/datasource/parquet_bulk_datasource.py

This file was deleted.

7 changes: 3 additions & 4 deletions python/ray/data/_internal/datasource/parquet_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,10 +281,9 @@ def combine_predicates(
class ParquetDatasource(Datasource):
"""Parquet datasource, for reading and writing Parquet files.

The primary difference from ParquetBulkDatasource is that this uses
PyArrow's `ParquetDataset` abstraction for dataset reads, and thus offers
automatic Arrow dataset schema inference and row count collection at the
cost of some potential performance and/or compatibility penalties.
This implementation uses PyArrow's `ParquetDataset` abstraction for dataset reads,
and thus offers automatic Arrow dataset schema inference and row count collection at
the cost of some potential performance and/or compatibility penalties.
"""

_FILE_EXTENSIONS = ["parquet"]
Expand Down
153 changes: 1 addition & 152 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
from ray.data._internal.datasource.mcap_datasource import MCAPDatasource, TimeRange
from ray.data._internal.datasource.mongo_datasource import MongoDatasource
from ray.data._internal.datasource.numpy_datasource import NumpyDatasource
from ray.data._internal.datasource.parquet_bulk_datasource import ParquetBulkDatasource
from ray.data._internal.datasource.parquet_datasource import ParquetDatasource
from ray.data._internal.datasource.range_datasource import RangeDatasource
from ray.data._internal.datasource.sql_datasource import SQLDatasource
Expand Down Expand Up @@ -100,12 +99,11 @@
)
from ray.data.datasource.file_meta_provider import (
DefaultFileMetadataProvider,
FastFileMetadataProvider,
FileMetadataProvider,
)
from ray.data.datasource.partitioning import Partitioning
from ray.types import ObjectRef
from ray.util.annotations import Deprecated, DeveloperAPI, PublicAPI
from ray.util.annotations import DeveloperAPI, PublicAPI
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

if TYPE_CHECKING:
Expand Down Expand Up @@ -1224,155 +1222,6 @@ class string
)


@Deprecated
def read_parquet_bulk(
paths: Union[str, List[str]],
*,
filesystem: Optional["pyarrow.fs.FileSystem"] = None,
columns: Optional[List[str]] = None,
parallelism: int = -1,
num_cpus: Optional[float] = None,
num_gpus: Optional[float] = None,
memory: Optional[float] = None,
ray_remote_args: Dict[str, Any] = None,
arrow_open_file_args: Optional[Dict[str, Any]] = None,
tensor_column_schema: Optional[Dict[str, Tuple[np.dtype, Tuple[int, ...]]]] = None,
meta_provider: Optional[BaseFileMetadataProvider] = None,
partition_filter: Optional[PathPartitionFilter] = None,
shuffle: Optional[Union[Literal["files"], FileShuffleConfig]] = None,
include_paths: bool = False,
file_extensions: Optional[List[str]] = ParquetBulkDatasource._FILE_EXTENSIONS,
concurrency: Optional[int] = None,
override_num_blocks: Optional[int] = None,
**arrow_parquet_args,
) -> Dataset:
"""Create :class:`~ray.data.Dataset` from parquet files without reading metadata.

Use :meth:`~ray.data.read_parquet` for most cases.

Use :meth:`~ray.data.read_parquet_bulk` if all the provided paths point to files
and metadata fetching using :meth:`~ray.data.read_parquet` takes too long or the
parquet files do not all have a unified schema.

Performance slowdowns are possible when using this method with parquet files that
are very large.

.. warning::

Only provide file paths as input (i.e., no directory paths). An
OSError is raised if one or more paths point to directories. If your
use-case requires directory paths, use :meth:`~ray.data.read_parquet`
instead.

Examples:
Read multiple local files. You should always provide only input file paths
(i.e. no directory paths) when known to minimize read latency.

>>> ray.data.read_parquet_bulk( # doctest: +SKIP
... ["/path/to/file1", "/path/to/file2"])

Args:
paths: A single file path or a list of file paths.
filesystem: The PyArrow filesystem
implementation to read from. These filesystems are
specified in the
`PyArrow docs <https://arrow.apache.org/docs/python/api/\
filesystems.html#filesystem-implementations>`_.
Specify this parameter if you need to provide specific configurations to
the filesystem. By default, the filesystem is automatically selected based
on the scheme of the paths. For example, if the path begins with ``s3://``,
the `S3FileSystem` is used.
columns: A list of column names to read. Only the
specified columns are read during the file scan.
parallelism: This argument is deprecated. Use ``override_num_blocks`` argument.
num_cpus: The number of CPUs to reserve for each parallel read worker.
num_gpus: The number of GPUs to reserve for each parallel read worker. For
example, specify `num_gpus=1` to request 1 GPU for each parallel read
worker.
memory: The heap memory in bytes to reserve for each parallel read worker.
ray_remote_args: kwargs passed to :func:`ray.remote` in the read tasks.
arrow_open_file_args: kwargs passed to
`pyarrow.fs.FileSystem.open_input_file <https://arrow.apache.org/docs/\
python/generated/pyarrow.fs.FileSystem.html\
#pyarrow.fs.FileSystem.open_input_file>`_.
when opening input files to read.
tensor_column_schema: A dict of column name to PyArrow dtype and shape
mappings for converting a Parquet column containing serialized
tensors (ndarrays) as their elements to PyArrow tensors. This function
assumes that the tensors are serialized in the raw
NumPy array format in C-contiguous order (e.g. via
`arr.tobytes()`).
meta_provider: [Deprecated] A :ref:`file metadata provider <metadata_provider>`.
Custom metadata providers may be able to resolve file metadata more quickly
and/or accurately. In most cases, you do not need to set this. If ``None``,
this function uses a system-chosen implementation.
partition_filter: A
:class:`~ray.data.datasource.partitioning.PathPartitionFilter`. Use
with a custom callback to read only selected partitions of a dataset.
By default, this filters out any file paths whose file extension does not
match "*.parquet*".
shuffle: If setting to "files", randomly shuffle input files order before read.
If setting to :class:`~ray.data.FileShuffleConfig`, you can pass a seed to
shuffle the input files. Defaults to not shuffle with ``None``.
arrow_parquet_args: Other parquet read options to pass to PyArrow. For the full
set of arguments, see
the `PyArrow API <https://arrow.apache.org/docs/python/generated/\
pyarrow.dataset.Scanner.html#pyarrow.dataset.Scanner.from_fragment>`_
include_paths: If ``True``, include the path to each file. File paths are
stored in the ``'path'`` column.
file_extensions: A list of file extensions to filter files by.
concurrency: The maximum number of Ray tasks to run concurrently. Set this
to control number of tasks to run concurrently. This doesn't change the
total number of tasks run or the total number of output blocks. By default,
concurrency is dynamically decided based on the available resources.
override_num_blocks: Override the number of output blocks from all read tasks.
By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.

Returns:
:class:`~ray.data.Dataset` producing records read from the specified paths.
"""
_emit_meta_provider_deprecation_warning(meta_provider)

warnings.warn(
"`read_parquet_bulk` is deprecated and will be removed after May 2025. Use "
"`read_parquet` instead.",
DeprecationWarning,
)

if meta_provider is None:
meta_provider = FastFileMetadataProvider()
read_table_args = _resolve_parquet_args(
tensor_column_schema,
**arrow_parquet_args,
)
if columns is not None:
read_table_args["columns"] = columns

datasource = ParquetBulkDatasource(
paths,
read_table_args=read_table_args,
filesystem=filesystem,
open_stream_args=arrow_open_file_args,
meta_provider=meta_provider,
partition_filter=partition_filter,
shuffle=shuffle,
include_paths=include_paths,
file_extensions=file_extensions,
)
return read_datasource(
datasource,
num_cpus=num_cpus,
num_gpus=num_gpus,
memory=memory,
parallelism=parallelism,
ray_remote_args=ray_remote_args,
concurrency=concurrency,
override_num_blocks=override_num_blocks,
)


@PublicAPI
def read_json(
paths: Union[str, List[str]],
Expand Down
Loading