diff --git a/conda/environments/all_cuda-129_arch-aarch64.yaml b/conda/environments/all_cuda-129_arch-aarch64.yaml index b197f829d19..7a603a29ec9 100644 --- a/conda/environments/all_cuda-129_arch-aarch64.yaml +++ b/conda/environments/all_cuda-129_arch-aarch64.yaml @@ -74,6 +74,7 @@ dependencies: - pytest-benchmark - pytest-cases>=3.8.2 - pytest-cov +- pytest-httpserver - pytest-rerunfailures!=16.0.0 - pytest-xdist - python-confluent-kafka>=2.8.0,<2.9.0a0 diff --git a/conda/environments/all_cuda-129_arch-x86_64.yaml b/conda/environments/all_cuda-129_arch-x86_64.yaml index c342aa87116..40ed1cbbc6b 100644 --- a/conda/environments/all_cuda-129_arch-x86_64.yaml +++ b/conda/environments/all_cuda-129_arch-x86_64.yaml @@ -75,6 +75,7 @@ dependencies: - pytest-benchmark - pytest-cases>=3.8.2 - pytest-cov +- pytest-httpserver - pytest-rerunfailures!=16.0.0 - pytest-xdist - python-confluent-kafka>=2.8.0,<2.9.0a0 diff --git a/conda/environments/all_cuda-130_arch-aarch64.yaml b/conda/environments/all_cuda-130_arch-aarch64.yaml index 6666010fad0..e58e93e1aa6 100644 --- a/conda/environments/all_cuda-130_arch-aarch64.yaml +++ b/conda/environments/all_cuda-130_arch-aarch64.yaml @@ -74,6 +74,7 @@ dependencies: - pytest-benchmark - pytest-cases>=3.8.2 - pytest-cov +- pytest-httpserver - pytest-rerunfailures!=16.0.0 - pytest-xdist - python-confluent-kafka>=2.8.0,<2.9.0a0 diff --git a/conda/environments/all_cuda-130_arch-x86_64.yaml b/conda/environments/all_cuda-130_arch-x86_64.yaml index 19745ad1df5..0d4dc2b2c2d 100644 --- a/conda/environments/all_cuda-130_arch-x86_64.yaml +++ b/conda/environments/all_cuda-130_arch-x86_64.yaml @@ -75,6 +75,7 @@ dependencies: - pytest-benchmark - pytest-cases>=3.8.2 - pytest-cov +- pytest-httpserver - pytest-rerunfailures!=16.0.0 - pytest-xdist - python-confluent-kafka>=2.8.0,<2.9.0a0 diff --git a/dependencies.yaml b/dependencies.yaml index 7b51525cc94..0e96a48ad86 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -97,6 +97,7 @@ files: - test_python_common - test_python_cudf_common - test_python_pylibcudf + - test_python_cudf_polars - depends_on_cudf - depends_on_dask_cuda - depends_on_pylibcudf @@ -891,6 +892,7 @@ dependencies: - output_types: [conda, requirements, pyproject] packages: - rich + - pytest-httpserver test_python_narwhals: common: - output_types: [conda, requirements, pyproject] diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index e7688d07f2e..d8d5472d49d 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -325,18 +325,30 @@ def __init__( raise NotImplementedError( "Read from cloud storage" ) # pragma: no cover; no test yet - if any( - str(p).startswith("https:/" if POLARS_VERSION_LT_131 else "https://") - for p in self.paths - ): + if ( + any(str(p).startswith("https:/") for p in self.paths) + and POLARS_VERSION_LT_131 + ): # pragma: no cover; polars passed us the wrong URI + # https://github.com/pola-rs/polars/issues/22766 raise NotImplementedError("Read from https") if any( str(p).startswith("file:/" if POLARS_VERSION_LT_131 else "file://") for p in self.paths ): - # TODO: removing the file:// may work raise NotImplementedError("Read from file URI") if self.typ == "csv": + if any(plc.io.SourceInfo._is_remote_uri(p) for p in self.paths): + # This works fine when the file has no leading blank lines, + # but currently we do some file introspection + # to skip blanks before parsing the header. + # For remote files we cannot determine if leading blank lines + # exist, so we're punting on CSV support. + # TODO: Once the CSV reader supports skipping leading + # blank lines natively, we can remove this guard. + raise NotImplementedError( + "Reading CSV from remote is not yet supported" + ) + if self.reader_options["skip_rows_after_header"] != 0: raise NotImplementedError("Skipping rows after header in CSV reader") parse_options = self.reader_options["parse_options"] diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index df0611348bb..cea7240d287 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -43,6 +43,7 @@ test = [ "numpy>=1.23,<3.0a0", "pytest", "pytest-cov", + "pytest-httpserver", "pytest-xdist", "rich", ] # This list was generated by `rapids-dependency-file-generator`. To make changes, edit ../../dependencies.yaml and run `rapids-dependency-file-generator`. diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 7510fe833be..cd6cbf3d981 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -6,6 +6,7 @@ from typing import TYPE_CHECKING import pytest +from werkzeug import Response import polars as pl @@ -14,10 +15,14 @@ assert_ir_translation_raises, ) from cudf_polars.testing.io import make_partitioned_source +from cudf_polars.utils.versions import POLARS_VERSION_LT_131 if TYPE_CHECKING: from pathlib import Path + from pytest_httpserver import HTTPServer + from werkzeug import Request + NO_CHUNK_ENGINE = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": False}) @@ -487,3 +492,114 @@ def test_scan_from_file_uri(tmp_path: Path) -> None: df.write_parquet(path) q = pl.scan_parquet(f"file://{path}") assert_ir_translation_raises(q, NotImplementedError) + + +@pytest.mark.parametrize("chunked", [False, True]) +def test_scan_parquet_remote( + request, tmp_path: Path, df: pl.DataFrame, httpserver: HTTPServer, *, chunked: bool +) -> None: + request.applymarker( + pytest.mark.xfail( + condition=POLARS_VERSION_LT_131, + reason="remote IO not supported", + ) + ) + path = tmp_path / "foo.parquet" + df.write_parquet(path) + bytes_ = path.read_bytes() + size = len(bytes_) + + def head_handler(_: Request) -> Response: + return Response( + status=200, + headers={ + "Content-Type": "parquet", + "Accept-Ranges": "bytes", + "Content-Length": size, + }, + ) + + def get_handler(req: Request) -> Response: + # parse bytes=200-500 for example (the actual data) + rng = req.headers.get("Range") + if rng and rng.startswith("bytes="): + start, end = map(int, req.headers["Range"][6:].split("-")) + mv = memoryview(bytes_)[start : end + 1] + return Response( + mv.tobytes(), + status=206, + headers={ + "Content-Type": "parquet", + "Accept-Ranges": "bytes", + "Content-Length": len(mv), + "Content-Range": f"bytes {start}-{end}/{size}", + }, + ) + return Response( + bytes_, + status=200, + headers={ + "Content-Type": "parquet", + "Accept-Ranges": "bytes", + "Content-Length": size, + }, + ) + + server_path = "/foo.parquet" + httpserver.expect_request(server_path, method="HEAD").respond_with_handler( + head_handler + ) + httpserver.expect_request(server_path, method="GET").respond_with_handler( + get_handler + ) + + q = pl.scan_parquet(httpserver.url_for(server_path)) + + assert_gpu_result_equal( + q, engine=pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": chunked}) + ) + + +def test_scan_ndjson_remote( + request, tmp_path: Path, df: pl.LazyFrame, httpserver: HTTPServer +) -> None: + request.applymarker( + pytest.mark.xfail( + condition=POLARS_VERSION_LT_131, + reason="remote IO not supported", + ) + ) + path = tmp_path / "foo.jsonl" + df.write_ndjson(path) + bytes_ = path.read_bytes() + size = len(bytes_) + + def head_handler(_: Request) -> Response: + return Response( + status=200, + headers={ + "Content-Type": "ndjson", + "Content-Length": size, + }, + ) + + def get_handler(_: Request) -> Response: + return Response( + bytes_, + status=200, + headers={ + "Content-Type": "ndjson", + "Content-Length": size, + }, + ) + + server_path = "/foo.jsonl" + httpserver.expect_request(server_path, method="HEAD").respond_with_handler( + head_handler + ) + httpserver.expect_request(server_path, method="GET").respond_with_handler( + get_handler + ) + + q = pl.scan_ndjson(httpserver.url_for(server_path)) + assert_gpu_result_equal(q) diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index f37982a47b4..d0671c86402 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -301,6 +301,7 @@ cdef class TableInputMetadata: for i in range(self.c_obj.column_metadata.size()) ] + cdef class TableWithMetadata: """A container holding a table and its associated metadata (e.g. column names) @@ -467,8 +468,6 @@ cdef class SourceInfo: A homogeneous list of sources to read from. Mixing different types of sources will raise a `ValueError`. """ - # Regular expression that match remote file paths supported by libcudf - _is_remote_file_pattern = re.compile(r"^[a-zA-Z][a-zA-Z0-9+.-]*://", re.IGNORECASE) def __init__(self, list sources): if not sources: @@ -483,7 +482,7 @@ cdef class SourceInfo: for src in sources: if not isinstance(src, (os.PathLike, str)): raise ValueError("All sources must be of the same type!") - if not (os.path.isfile(src) or self._is_remote_file_pattern.match(src)): + if not (os.path.isfile(src) or SourceInfo._is_remote_uri(src)): raise FileNotFoundError( errno.ENOENT, os.strerror(errno.ENOENT), src ) @@ -538,6 +537,13 @@ cdef class SourceInfo: self.c_obj = source_info(host_span[host_span[const_byte]](self._hspans)) + @staticmethod + def _is_remote_uri(path: str | os.PathLike) -> bool: + # Regular expression that match remote file paths supported by libcudf + return re.compile( + r"^[a-zA-Z][a-zA-Z0-9+.\-]*://", re.IGNORECASE + ).match(str(path)) is not None + def _init_byte_like_sources(self, list sources, type expected_type): cdef const unsigned char[::1] c_buffer cdef bint empty_buffer = True