From a9ff5799f9596ce60d4127053c1bf2ce00535854 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 2 May 2022 13:52:32 -0700 Subject: [PATCH 01/14] enable s3 writes in chunked parquet writer --- conda/environments/cudf_dev_cuda11.5.yml | 5 ++ conda/recipes/cudf/meta.yaml | 1 + python/cudf/cudf/io/parquet.py | 69 +++++++++++++++++------- python/cudf/cudf/tests/test_s3.py | 42 +++++++++++++++ 4 files changed, 99 insertions(+), 18 deletions(-) diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml index 15f4bff583e..0fef4bdd4ac 100644 --- a/conda/environments/cudf_dev_cuda11.5.yml +++ b/conda/environments/cudf_dev_cuda11.5.yml @@ -67,6 +67,11 @@ dependencies: - pydata-sphinx-theme - librdkafka=1.7.0 - python-confluent-kafka=1.7.0 + - moto>=1.3.14 + - boto3 + - s3fs + - flask + - flask_cors - pip: - git+https://github.com/python-streamz/streamz.git@master - pyorc diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml index 84443a45567..f7087d14fbf 100644 --- a/conda/recipes/cudf/meta.yaml +++ b/conda/recipes/cudf/meta.yaml @@ -46,6 +46,7 @@ requirements: - cupy >=9.5.0,<11.0.0a0 - numba >=0.54 - numpy + - s3fs - {{ pin_compatible('pyarrow', max_pin='x.x.x') }} *cuda - libcudf {{ version }} - fastavro >=0.22.0 diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 5746bf6fec9..6fc3a357928 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -1,5 +1,7 @@ # Copyright (c) 2019-2022, NVIDIA CORPORATION. +import shutil +import tempfile import warnings from collections import defaultdict from contextlib import ExitStack @@ -7,6 +9,7 @@ from uuid import uuid4 import numpy as np +import s3fs from pyarrow import dataset as ds, parquet as pq import cudf @@ -206,12 +209,25 @@ def _process_dataset( filters = pq._filters_to_expression(filters) # Initialize ds.FilesystemDataset - dataset = ds.dataset( - paths, - filesystem=fs, - format="parquet", - partitioning="hive", - ) + if ( + isinstance(fs, s3fs.S3FileSystem) + and len(paths) == 1 + and fs.isdir(paths[0]) + ): + # TODO: Remove this workaround after following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 + dataset = ds.dataset( + "s3://" + paths[0], + format="parquet", + partitioning="hive", + ) + else: + dataset = ds.dataset( + paths, + filesystem=fs, + format="parquet", + partitioning="hive", + ) file_list = dataset.files if len(file_list) == 0: raise FileNotFoundError(f"{paths} could not be resolved to any files") @@ -724,6 +740,7 @@ def __init__( index=None, compression=None, statistics="ROWGROUP", + **kwargs, ) -> None: """ Write a parquet file or dataset incrementally @@ -776,7 +793,12 @@ def __init__( .parquet """ - self.path = path + if isinstance(path, str) and path.startswith("s3://"): + self.fs_meta = {"is_s3": True, "actual_path": path} + self.path = tempfile.TemporaryDirectory().name + else: + self.fs_meta = {} + self.path = path self.common_args = { "index": index, "compression": compression, @@ -792,6 +814,7 @@ def __init__( # in self._chunked_writers for reverse lookup self.path_cw_map: Dict[str, int] = {} self.filename = None + self.kwargs = kwargs @_cudf_nvtx_annotate def write_table(self, df): @@ -837,18 +860,19 @@ def write_table(self, df): ] cw.write_table(grouped_df, this_cw_part_info) - # Create new cw for unhandled paths encountered in this write_table - new_paths, part_info, meta_paths = zip(*new_cw_paths) - self._chunked_writers.append( - ( - ParquetWriter(new_paths, **self.common_args), - new_paths, - meta_paths, + if new_cw_paths: + # Create new cw for unhandled paths encountered in this write_table + new_paths, part_info, meta_paths = zip(*new_cw_paths) + self._chunked_writers.append( + ( + ParquetWriter(new_paths, **self.common_args), + new_paths, + meta_paths, + ) ) - ) - new_cw_idx = len(self._chunked_writers) - 1 - self.path_cw_map.update({k: new_cw_idx for k in new_paths}) - self._chunked_writers[-1][0].write_table(grouped_df, part_info) + new_cw_idx = len(self._chunked_writers) - 1 + self.path_cw_map.update({k: new_cw_idx for k in new_paths}) + self._chunked_writers[-1][0].write_table(grouped_df, part_info) @_cudf_nvtx_annotate def close(self, return_metadata=False): @@ -862,6 +886,15 @@ def close(self, return_metadata=False): for cw, _, meta_path in self._chunked_writers ] + if self.fs_meta.get("is_s3", False): + local_path = self.path + s3_path = self.fs_meta["actual_path"] + s3_file, _ = ioutils._get_filesystem_and_paths( + s3_path, **self.kwargs + ) + s3_file.put(local_path, s3_path, recursive=True) + shutil.rmtree(self.path) + if return_metadata: return ( merge_parquet_filemetadata(metadata) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index d783483a8cb..41368dab289 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -22,6 +22,8 @@ boto3 = pytest.importorskip("boto3") requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") +flask = pytest.importorskip("flask") +flask_cors = pytest.importorskip("flask_cors") @contextmanager @@ -49,6 +51,7 @@ def s3_base(worker_id): # system aws credentials, https://github.com/spulec/moto/issues/1793 os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG") # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost @@ -457,3 +460,42 @@ def test_write_orc(s3_base, s3so, pdf): got = pa.orc.ORCFile(f).read().to_pandas() assert_eq(pdf, got) + + +def test_write_chunked_parquet(s3_base, s3so): + df1 = cudf.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}) + df2 = cudf.DataFrame({"b": [20, 30, 50], "a": [3, 2, 1]}) + dirname = "chunked_writer_directory" + bname = "parquet" + from cudf.io.parquet import ParquetDatasetWriter + + with s3_context( + s3_base=s3_base, bucket=bname, files={dirname: BytesIO()} + ) as s3fs: + cw = ParquetDatasetWriter( + f"s3://{bname}/{dirname}", + partition_cols=["a"], + storage_options=s3so, + ) + cw.write_table(df1) + cw.write_table(df2) + cw.close() + + # TODO: Replace following workaround with: + # expect = cudf.read_parquet(f"s3://{bname}/{dirname}/", + # storage_options=s3so) + # after the following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 + + dfs = [] + for folder in {"a=1", "a=2", "a=3"}: + assert s3fs.exists(f"s3://{bname}/{dirname}/{folder}") + for file in s3fs.ls(f"s3://{bname}/{dirname}/{folder}"): + df = cudf.read_parquet("s3://" + file, storage_options=s3so) + dfs.append(df) + + actual = cudf.concat(dfs).astype("int64") + assert_eq( + actual.sort_values(["b"]).reset_index(drop=True), + cudf.concat([df1, df2]).sort_values(["b"]).reset_index(drop=True), + ) From 8176093651d7f74ca0c93e83a81ea84449653850 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 2 May 2022 16:12:50 -0700 Subject: [PATCH 02/14] updates --- conda/environments/cudf_dev_cuda11.5.yml | 8 +++++--- conda/recipes/cudf/meta.yaml | 2 +- python/cudf/cudf/tests/test_s3.py | 2 +- python/dask_cudf/dask_cudf/io/tests/test_s3.py | 3 +++ 4 files changed, 10 insertions(+), 5 deletions(-) diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml index 0fef4bdd4ac..3eb1028b134 100644 --- a/conda/environments/cudf_dev_cuda11.5.yml +++ b/conda/environments/cudf_dev_cuda11.5.yml @@ -67,9 +67,11 @@ dependencies: - pydata-sphinx-theme - librdkafka=1.7.0 - python-confluent-kafka=1.7.0 - - moto>=1.3.14 - - boto3 - - s3fs + - moto>=3.1.6 + - boto3>=1.21.21 + - botocore>=1.24.21 + - aiobotocore>=2.2.0 + - s3fs>=2022.3.0 - flask - flask_cors - pip: diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml index f7087d14fbf..86e1fbf3f2f 100644 --- a/conda/recipes/cudf/meta.yaml +++ b/conda/recipes/cudf/meta.yaml @@ -46,7 +46,7 @@ requirements: - cupy >=9.5.0,<11.0.0a0 - numba >=0.54 - numpy - - s3fs + - s3fs >=2022.3.0 - {{ pin_compatible('pyarrow', max_pin='x.x.x') }} *cuda - libcudf {{ version }} - fastavro >=0.22.0 diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 41368dab289..0714c030014 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -18,7 +18,7 @@ import cudf from cudf.testing._utils import assert_eq -moto = pytest.importorskip("moto", minversion="1.3.14") +moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 83ff1273b36..88d4374769c 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -1,3 +1,5 @@ +# Copyright (c) 2020-2022, NVIDIA CORPORATION. + import os import shlex import subprocess @@ -42,6 +44,7 @@ def s3_base(worker_id): # system aws credentials, https://github.com/spulec/moto/issues/1793 os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key") os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret") + os.environ.setdefault("S3FS_LOGGING_LEVEL", "DEBUG") # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost From 7569db602449c2c66478645a22cd385ae64aecee Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 2 May 2022 18:37:22 -0700 Subject: [PATCH 03/14] remove s3fs hard requirement --- conda/recipes/cudf/meta.yaml | 1 - python/cudf/cudf/io/parquet.py | 9 +++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/conda/recipes/cudf/meta.yaml b/conda/recipes/cudf/meta.yaml index 86e1fbf3f2f..84443a45567 100644 --- a/conda/recipes/cudf/meta.yaml +++ b/conda/recipes/cudf/meta.yaml @@ -46,7 +46,6 @@ requirements: - cupy >=9.5.0,<11.0.0a0 - numba >=0.54 - numpy - - s3fs >=2022.3.0 - {{ pin_compatible('pyarrow', max_pin='x.x.x') }} *cuda - libcudf {{ version }} - fastavro >=0.22.0 diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 6fc3a357928..22c516dfece 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -9,7 +9,11 @@ from uuid import uuid4 import numpy as np -import s3fs + +try: + import s3fs +except (ImportError, ModuleNotFoundError): + s3fs = None from pyarrow import dataset as ds, parquet as pq import cudf @@ -210,7 +214,8 @@ def _process_dataset( # Initialize ds.FilesystemDataset if ( - isinstance(fs, s3fs.S3FileSystem) + s3fs is not None + and isinstance(fs, s3fs.S3FileSystem) and len(paths) == 1 and fs.isdir(paths[0]) ): From 6b01d73b30711e26d6ba936838d95574b019368f Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 3 May 2022 11:15:35 -0700 Subject: [PATCH 04/14] change workaround --- python/cudf/cudf/io/parquet.py | 34 +++++++++++++++------------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 22c516dfece..cf1e1167ccb 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -213,26 +213,22 @@ def _process_dataset( filters = pq._filters_to_expression(filters) # Initialize ds.FilesystemDataset - if ( - s3fs is not None - and isinstance(fs, s3fs.S3FileSystem) - and len(paths) == 1 - and fs.isdir(paths[0]) - ): - # TODO: Remove this workaround after following bug is fixed: - # https://issues.apache.org/jira/browse/ARROW-16438 - dataset = ds.dataset( - "s3://" + paths[0], - format="parquet", - partitioning="hive", - ) - else: - dataset = ds.dataset( - paths, - filesystem=fs, - format="parquet", - partitioning="hive", + # TODO: Remove the s3fs workaround after following bug is fixed: + # https://issues.apache.org/jira/browse/ARROW-16438 + dataset = ds.dataset( + source=paths[0] + if ( + s3fs is not None + and isinstance(fs, s3fs.S3FileSystem) + and len(paths) == 1 + and fs.isdir(paths[0]) ) + else paths, + filesystem=fs, + format="parquet", + partitioning="hive", + ) + file_list = dataset.files if len(file_list) == 0: raise FileNotFoundError(f"{paths} could not be resolved to any files") From 9d3f9bec1d52b67cbebdd45935282093942c6f30 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 3 May 2022 11:51:11 -0700 Subject: [PATCH 05/14] better port handling --- python/cudf/cudf/tests/test_s3.py | 21 +++++++++--------- .../dask_cudf/dask_cudf/io/tests/test_s3.py | 22 +++++++++---------- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 0714c030014..5be3495ed20 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -2,6 +2,7 @@ import os import shlex +import socket import subprocess import time from contextlib import contextmanager @@ -26,6 +27,14 @@ flask_cors = pytest.importorskip("flask_cors") +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("", 0)) + return sock.getsockname()[1] + + @contextmanager def ensure_safe_environment_variables(): """ @@ -42,7 +51,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(worker_id): +def s3_base(endpoint_port): """ Fixture to set up moto server in separate process """ @@ -56,11 +65,6 @@ def s3_base(worker_id): # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - endpoint_port = ( - 5000 - if worker_id == "master" - else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" proc = subprocess.Popen( @@ -85,13 +89,10 @@ def s3_base(worker_id): @pytest.fixture() -def s3so(worker_id): +def s3so(endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_port = ( - 5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 88d4374769c..547e8cc2816 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -2,6 +2,7 @@ import os import shlex +import socket import subprocess import time from contextlib import contextmanager @@ -19,6 +20,14 @@ s3fs = pytest.importorskip("s3fs") +@pytest.fixture(scope="session") +def endpoint_port(): + # Return a free port per worker session. + sock = socket.socket() + sock.bind(("", 0)) + return sock.getsockname()[1] + + @contextmanager def ensure_safe_environment_variables(): """ @@ -35,7 +44,7 @@ def ensure_safe_environment_variables(): @pytest.fixture(scope="session") -def s3_base(worker_id): +def s3_base(endpoint_port): """ Fixture to set up moto server in separate process """ @@ -48,12 +57,6 @@ def s3_base(worker_id): # Launching moto in server mode, i.e., as a separate process # with an S3 endpoint on localhost - - endpoint_port = ( - 5000 - if worker_id == "master" - else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" proc = subprocess.Popen( @@ -78,13 +81,10 @@ def s3_base(worker_id): @pytest.fixture() -def s3so(worker_id): +def s3so(endpoint_port): """ Returns s3 storage options to pass to fsspec """ - endpoint_port = ( - 5000 if worker_id == "master" else 5550 + int(worker_id.lstrip("gw")) - ) endpoint_uri = f"http://127.0.0.1:{endpoint_port}/" return {"client_kwargs": {"endpoint_url": endpoint_uri}} From a79e6d5bc9082e3b57673e606d5c20693fb98a57 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Fri, 6 May 2022 09:50:05 -0700 Subject: [PATCH 06/14] remove s3fs and fix a pytest --- python/cudf/cudf/io/parquet.py | 14 +------------- python/cudf/cudf/tests/test_s3.py | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index cf1e1167ccb..43a3d0c0ece 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -9,11 +9,6 @@ from uuid import uuid4 import numpy as np - -try: - import s3fs -except (ImportError, ModuleNotFoundError): - s3fs = None from pyarrow import dataset as ds, parquet as pq import cudf @@ -216,14 +211,7 @@ def _process_dataset( # TODO: Remove the s3fs workaround after following bug is fixed: # https://issues.apache.org/jira/browse/ARROW-16438 dataset = ds.dataset( - source=paths[0] - if ( - s3fs is not None - and isinstance(fs, s3fs.S3FileSystem) - and len(paths) == 1 - and fs.isdir(paths[0]) - ) - else paths, + source=paths[0] if len(paths) == 1 else paths, filesystem=fs, format="parquet", partitioning="hive", diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 5be3495ed20..3154dece0f6 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -364,20 +364,33 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): @pytest.mark.parametrize("partition_cols", [None, ["String"]]) def test_write_parquet(s3_base, s3so, pdf, partition_cols): - fname = "test_parquet_writer.parquet" + fname_cudf = "test_parquet_writer_cudf" + fname_pandas = "test_parquet_writer_pandas" bname = "parquet" gdf = cudf.from_pandas(pdf) + with s3_context(s3_base=s3_base, bucket=bname) as s3fs: gdf.to_parquet( - f"s3://{bname}/{fname}", + f"s3://{bname}/{fname_cudf}", partition_cols=partition_cols, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname}") + assert s3fs.exists(f"s3://{bname}/{fname_cudf}") + pdf.to_parquet( + f"s3://{bname}/{fname_pandas}", + partition_cols=partition_cols, + storage_options=s3so, + ) + assert s3fs.exists(f"s3://{bname}/{fname_pandas}") - got = pd.read_parquet(s3fs.open(f"s3://{bname}/{fname}")) + got = pd.read_parquet( + f"s3://{bname}/{fname_pandas}", storage_options=s3so + ) + expect = cudf.read_parquet( + f"s3://{bname}/{fname_cudf}", storage_options=s3so + ) - assert_eq(pdf, got) + assert_eq(expect, got) def test_read_json(s3_base, s3so): From a11b65848f36200ccbb03937d5914d20f0892efb Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Fri, 6 May 2022 09:57:52 -0700 Subject: [PATCH 07/14] tweak todo comment --- python/cudf/cudf/io/parquet.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 43a3d0c0ece..fb047ba22c2 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -208,7 +208,7 @@ def _process_dataset( filters = pq._filters_to_expression(filters) # Initialize ds.FilesystemDataset - # TODO: Remove the s3fs workaround after following bug is fixed: + # TODO: Remove the if len(paths) workaround after following bug is fixed: # https://issues.apache.org/jira/browse/ARROW-16438 dataset = ds.dataset( source=paths[0] if len(paths) == 1 else paths, From aa7e050d7098b7c42b3fe148e7c1ea97281198e7 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Fri, 6 May 2022 13:02:21 -0700 Subject: [PATCH 08/14] update docs --- docs/cudf/source/api_docs/io.rst | 4 ++ python/cudf/cudf/io/__init__.py | 3 +- python/cudf/cudf/io/parquet.py | 103 ++++++++++++++++--------------- 3 files changed, 58 insertions(+), 52 deletions(-) diff --git a/docs/cudf/source/api_docs/io.rst b/docs/cudf/source/api_docs/io.rst index 7e4d1b48c93..a52667cd3e4 100644 --- a/docs/cudf/source/api_docs/io.rst +++ b/docs/cudf/source/api_docs/io.rst @@ -36,6 +36,10 @@ Parquet read_parquet DataFrame.to_parquet cudf.io.parquet.read_parquet_metadata + :template: autosummary/class_with_autosummary.rst + + cudf.io.parquet.ParquetDatasetWriter + ORC ~~~ diff --git a/python/cudf/cudf/io/__init__.py b/python/cudf/cudf/io/__init__.py index 15404b26042..4ec84ecbc74 100644 --- a/python/cudf/cudf/io/__init__.py +++ b/python/cudf/cudf/io/__init__.py @@ -1,4 +1,4 @@ -# Copyright (c) 2018, NVIDIA CORPORATION. +# Copyright (c) 2018-2022, NVIDIA CORPORATION. from cudf.io.avro import read_avro from cudf.io.csv import read_csv, to_csv from cudf.io.dlpack import from_dlpack @@ -9,6 +9,7 @@ from cudf.io.parquet import ( merge_parquet_filemetadata, read_parquet, + ParquetDatasetWriter, read_parquet_metadata, write_to_dataset, ) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index fb047ba22c2..93c45794599 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -721,6 +721,58 @@ def _get_partitioned( class ParquetDatasetWriter: + """ + Write a parquet file or dataset incrementally + + Parameters + ---------- + path : str or s3 and URL + A Local directory path/s3 URL. Will be used as Root Directory + path while writing a partitioned dataset. + partition_cols : list + Column names by which to partition the dataset + Columns are partitioned in the order they are given + index : bool, default None + If ``True``, include the dataframe’s index(es) in the file output. + If ``False``, they will not be written to the file. If ``None``, + index(es) other than RangeIndex will be saved as columns. + compression : {'snappy', None}, default 'snappy' + Name of the compression to use. Use ``None`` for no compression. + statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' + Level at which column statistics should be included in file. + + + Examples + -------- + Using a context + + >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) + >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) + >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: + ... cw.write_table(df1) + ... cw.write_table(df2) + + By manually calling ``close()`` + + >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) + >>> cw.write_table(df1) + >>> cw.write_table(df2) + >>> cw.close() + + Both the methods will generate the same directory structure + + .. code-block:: bash + + dataset/ + a=1 + .parquet + a=2 + .parquet + a=3 + .parquet + + """ + @_cudf_nvtx_annotate def __init__( self, @@ -731,57 +783,6 @@ def __init__( statistics="ROWGROUP", **kwargs, ) -> None: - """ - Write a parquet file or dataset incrementally - - Parameters - ---------- - path : str - File path or Root Directory path. Will be used as Root Directory - path while writing a partitioned dataset. - partition_cols : list - Column names by which to partition the dataset - Columns are partitioned in the order they are given - index : bool, default None - If ``True``, include the dataframe’s index(es) in the file output. - If ``False``, they will not be written to the file. If ``None``, - index(es) other than RangeIndex will be saved as columns. - compression : {'snappy', None}, default 'snappy' - Name of the compression to use. Use ``None`` for no compression. - statistics : {'ROWGROUP', 'PAGE', 'NONE'}, default 'ROWGROUP' - Level at which column statistics should be included in file. - - - Examples - ________ - Using a context - - >>> df1 = cudf.DataFrame({"a": [1, 1, 2, 2, 1], "b": [9, 8, 7, 6, 5]}) - >>> df2 = cudf.DataFrame({"a": [1, 3, 3, 1, 3], "b": [4, 3, 2, 1, 0]}) - >>> with ParquetDatasetWriter("./dataset", partition_cols=["a"]) as cw: - ... cw.write_table(df1) - ... cw.write_table(df2) - - By manually calling ``close()`` - - >>> cw = ParquetDatasetWriter("./dataset", partition_cols=["a"]) - >>> cw.write_table(df1) - >>> cw.write_table(df2) - >>> cw.close() - - Both the methods will generate the same directory structure - - .. code-block:: bash - - dataset/ - a=1 - .parquet - a=2 - .parquet - a=3 - .parquet - - """ if isinstance(path, str) and path.startswith("s3://"): self.fs_meta = {"is_s3": True, "actual_path": path} self.path = tempfile.TemporaryDirectory().name From 23eccb614b7611cd8c763575f641635fee59fb49 Mon Sep 17 00:00:00 2001 From: GALI PREM SAGAR Date: Mon, 9 May 2022 13:43:38 -0500 Subject: [PATCH 09/14] Apply suggestions from code review Co-authored-by: Bradley Dice --- python/cudf/cudf/io/parquet.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 93c45794599..f30bfec45c9 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -726,14 +726,14 @@ class ParquetDatasetWriter: Parameters ---------- - path : str or s3 and URL - A Local directory path/s3 URL. Will be used as Root Directory + path : str + A local directory path or S3 URL. Will be used as root directory path while writing a partitioned dataset. partition_cols : list Column names by which to partition the dataset Columns are partitioned in the order they are given index : bool, default None - If ``True``, include the dataframe’s index(es) in the file output. + If ``True``, include the dataframe's index(es) in the file output. If ``False``, they will not be written to the file. If ``None``, index(es) other than RangeIndex will be saved as columns. compression : {'snappy', None}, default 'snappy' @@ -761,7 +761,7 @@ class ParquetDatasetWriter: Both the methods will generate the same directory structure - .. code-block:: bash + .. code-block:: none dataset/ a=1 From 14a100acd066aeeb32b48dd2b987b97beb21c8a8 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 9 May 2022 12:13:54 -0700 Subject: [PATCH 10/14] remove explicit flask dependency --- conda/environments/cudf_dev_cuda11.5.yml | 2 -- python/cudf/cudf/tests/test_s3.py | 2 -- 2 files changed, 4 deletions(-) diff --git a/conda/environments/cudf_dev_cuda11.5.yml b/conda/environments/cudf_dev_cuda11.5.yml index 3eb1028b134..96f0c9c02a4 100644 --- a/conda/environments/cudf_dev_cuda11.5.yml +++ b/conda/environments/cudf_dev_cuda11.5.yml @@ -72,8 +72,6 @@ dependencies: - botocore>=1.24.21 - aiobotocore>=2.2.0 - s3fs>=2022.3.0 - - flask - - flask_cors - pip: - git+https://github.com/python-streamz/streamz.git@master - pyorc diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 3154dece0f6..749a12e8934 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -23,8 +23,6 @@ boto3 = pytest.importorskip("boto3") requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") -flask = pytest.importorskip("flask") -flask_cors = pytest.importorskip("flask_cors") @pytest.fixture(scope="session") From 3591b3e1e5597c563a738cf3f5779b2296dad2b3 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 9 May 2022 12:26:52 -0700 Subject: [PATCH 11/14] use 127.0.0.1 explicitly --- python/cudf/cudf/tests/test_s3.py | 2 +- python/dask_cudf/dask_cudf/io/tests/test_s3.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 749a12e8934..6282de7cae3 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -29,7 +29,7 @@ def endpoint_port(): # Return a free port per worker session. sock = socket.socket() - sock.bind(("", 0)) + sock.bind(("127.0.0.1", 0)) return sock.getsockname()[1] diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 547e8cc2816..77e5f38f251 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -24,7 +24,7 @@ def endpoint_port(): # Return a free port per worker session. sock = socket.socket() - sock.bind(("", 0)) + sock.bind(("127.0.0.1", 0)) return sock.getsockname()[1] From 75c5de33a74f79b5f2c412d417e04e9d2aa9a136 Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Mon, 9 May 2022 12:29:50 -0700 Subject: [PATCH 12/14] context manager style --- python/cudf/cudf/tests/test_s3.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 6282de7cae3..f2a14ecadb8 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -478,31 +478,30 @@ def test_write_chunked_parquet(s3_base, s3so): df1 = cudf.DataFrame({"b": [10, 11, 12], "a": [1, 2, 3]}) df2 = cudf.DataFrame({"b": [20, 30, 50], "a": [3, 2, 1]}) dirname = "chunked_writer_directory" - bname = "parquet" + bucket = "parquet" from cudf.io.parquet import ParquetDatasetWriter with s3_context( - s3_base=s3_base, bucket=bname, files={dirname: BytesIO()} + s3_base=s3_base, bucket=bucket, files={dirname: BytesIO()} ) as s3fs: - cw = ParquetDatasetWriter( - f"s3://{bname}/{dirname}", + with ParquetDatasetWriter( + f"s3://{bucket}/{dirname}", partition_cols=["a"], storage_options=s3so, - ) - cw.write_table(df1) - cw.write_table(df2) - cw.close() + ) as cw: + cw.write_table(df1) + cw.write_table(df2) # TODO: Replace following workaround with: - # expect = cudf.read_parquet(f"s3://{bname}/{dirname}/", + # expect = cudf.read_parquet(f"s3://{bucket}/{dirname}/", # storage_options=s3so) # after the following bug is fixed: # https://issues.apache.org/jira/browse/ARROW-16438 dfs = [] for folder in {"a=1", "a=2", "a=3"}: - assert s3fs.exists(f"s3://{bname}/{dirname}/{folder}") - for file in s3fs.ls(f"s3://{bname}/{dirname}/{folder}"): + assert s3fs.exists(f"s3://{bucket}/{dirname}/{folder}") + for file in s3fs.ls(f"s3://{bucket}/{dirname}/{folder}"): df = cudf.read_parquet("s3://" + file, storage_options=s3so) dfs.append(df) From 2a743a2b8c13a741607d7367a48aa8780ee4bd9e Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 10 May 2022 08:50:05 -0700 Subject: [PATCH 13/14] close port --- python/cudf/cudf/tests/test_s3.py | 4 +++- python/dask_cudf/dask_cudf/io/tests/test_s3.py | 6 ++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index 8cb1d17777f..eb739cf6c8a 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -30,7 +30,9 @@ def endpoint_port(): # Return a free port per worker session. sock = socket.socket() sock.bind(("127.0.0.1", 0)) - return sock.getsockname()[1] + port = sock.getsockname()[1] + sock.close() + return port @contextmanager diff --git a/python/dask_cudf/dask_cudf/io/tests/test_s3.py b/python/dask_cudf/dask_cudf/io/tests/test_s3.py index 77e5f38f251..9283380296c 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_s3.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_s3.py @@ -14,7 +14,7 @@ import dask_cudf -moto = pytest.importorskip("moto", minversion="1.3.14") +moto = pytest.importorskip("moto", minversion="3.1.6") boto3 = pytest.importorskip("boto3") requests = pytest.importorskip("requests") s3fs = pytest.importorskip("s3fs") @@ -25,7 +25,9 @@ def endpoint_port(): # Return a free port per worker session. sock = socket.socket() sock.bind(("127.0.0.1", 0)) - return sock.getsockname()[1] + port = sock.getsockname()[1] + sock.close() + return port @contextmanager From a11f507d78dbab5991e6da63a352a50d73aa6eaf Mon Sep 17 00:00:00 2001 From: galipremsagar Date: Tue, 10 May 2022 09:19:13 -0700 Subject: [PATCH 14/14] bname to bucket --- python/cudf/cudf/tests/test_s3.py | 110 +++++++++++++++--------------- 1 file changed, 55 insertions(+), 55 deletions(-) diff --git a/python/cudf/cudf/tests/test_s3.py b/python/cudf/cudf/tests/test_s3.py index eb739cf6c8a..0966bee93fd 100644 --- a/python/cudf/cudf/tests/test_s3.py +++ b/python/cudf/cudf/tests/test_s3.py @@ -145,13 +145,13 @@ def pdf_ext(scope="module"): def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): # Write to buffer fname = "test_csv_reader.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) # Use fsspec file object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, use_python_file_object=False, @@ -159,9 +159,9 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): assert_eq(pdf, got) # Use Arrow PythonFile object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, use_python_file_object=True, @@ -172,13 +172,13 @@ def test_read_csv(s3_base, s3so, pdf, bytes_per_thread): def test_read_csv_arrow_nativefile(s3_base, s3so, pdf): # Write to buffer fname = "test_csv_reader_arrow_nativefile.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_csv(fil) assert_eq(pdf, got) @@ -191,13 +191,13 @@ def test_read_csv_byte_range( ): # Write to buffer fname = "test_csv_reader_byte_range.csv" - bname = "csv" + bucket = "csv" buffer = pdf.to_csv(index=False) # Use fsspec file object - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, byte_range=(74, 73), bytes_per_thread=bytes_per_thread, @@ -213,19 +213,19 @@ def test_read_csv_byte_range( def test_write_csv(s3_base, s3so, pdf, chunksize): # Write to buffer fname = "test_csv_writer.csv" - bname = "csv" + bucket = "csv" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: gdf.to_csv( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", index=False, chunksize=chunksize, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname}") + assert s3fs.exists(f"s3://{bucket}/{fname}") # TODO: Update to use `storage_options` from pandas v1.2.0 - got = pd.read_csv(s3fs.open(f"s3://{bname}/{fname}")) + got = pd.read_csv(s3fs.open(f"s3://{bucket}/{fname}")) assert_eq(pdf, got) @@ -244,15 +244,15 @@ def test_read_parquet( use_python_file_object, ): fname = "test_parquet_reader.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) # Check direct path handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got1 = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", open_file_options=( {"precache_options": {"method": precache}} if use_python_file_object @@ -268,11 +268,11 @@ def test_read_parquet( # Check fsspec file-object handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): - fs = get_fs_token_paths(f"s3://{bname}/{fname}", storage_options=s3so)[ - 0 - ] - with fs.open(f"s3://{bname}/{fname}", mode="rb") as f: + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): + fs = get_fs_token_paths( + f"s3://{bucket}/{fname}", storage_options=s3so + )[0] + with fs.open(f"s3://{bucket}/{fname}", mode="rb") as f: got2 = cudf.read_parquet( f, bytes_per_thread=bytes_per_thread, @@ -294,7 +294,7 @@ def test_read_parquet_ext( index, ): fname = "test_parquet_reader_ext.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() if index: @@ -304,9 +304,9 @@ def test_read_parquet_ext( # Check direct path handling buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got1 = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, bytes_per_thread=bytes_per_thread, footer_sample_size=3200, @@ -327,15 +327,15 @@ def test_read_parquet_ext( def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): # Write to buffer fname = "test_parquet_reader_arrow_nativefile.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf.to_parquet(path=buffer) buffer.seek(0) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_parquet(fil, columns=columns) expect = pdf[columns] if columns else pdf @@ -345,14 +345,14 @@ def test_read_parquet_arrow_nativefile(s3_base, s3so, pdf, columns): @pytest.mark.parametrize("precache", [None, "parquet"]) def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): fname = "test_parquet_reader_filters.parquet" - bname = "parquet" + bucket = "parquet" buffer = BytesIO() pdf_ext.to_parquet(path=buffer) buffer.seek(0) filters = [("String", "==", "Omega")] - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_parquet( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", storage_options=s3so, filters=filters, open_file_options={"precache_options": {"method": precache}}, @@ -366,28 +366,28 @@ def test_read_parquet_filters(s3_base, s3so, pdf_ext, precache): def test_write_parquet(s3_base, s3so, pdf, partition_cols): fname_cudf = "test_parquet_writer_cudf" fname_pandas = "test_parquet_writer_pandas" - bname = "parquet" + bucket = "parquet" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: gdf.to_parquet( - f"s3://{bname}/{fname_cudf}", + f"s3://{bucket}/{fname_cudf}", partition_cols=partition_cols, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname_cudf}") + assert s3fs.exists(f"s3://{bucket}/{fname_cudf}") pdf.to_parquet( - f"s3://{bname}/{fname_pandas}", + f"s3://{bucket}/{fname_pandas}", partition_cols=partition_cols, storage_options=s3so, ) - assert s3fs.exists(f"s3://{bname}/{fname_pandas}") + assert s3fs.exists(f"s3://{bucket}/{fname_pandas}") got = pd.read_parquet( - f"s3://{bname}/{fname_pandas}", storage_options=s3so + f"s3://{bucket}/{fname_pandas}", storage_options=s3so ) expect = cudf.read_parquet( - f"s3://{bname}/{fname_cudf}", storage_options=s3so + f"s3://{bucket}/{fname_cudf}", storage_options=s3so ) assert_eq(expect, got) @@ -395,7 +395,7 @@ def test_write_parquet(s3_base, s3so, pdf, partition_cols): def test_read_json(s3_base, s3so): fname = "test_json_reader.json" - bname = "json" + bucket = "json" # TODO: After following bug is fixed switch # back to using bytes: # https://github.com/pandas-dev/pandas/issues/46935 @@ -413,9 +413,9 @@ def test_read_json(s3_base, s3so): '{"amount": 400, "name": "Dennis"}\n' ) - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_json( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", engine="cudf", orient="records", lines=True, @@ -431,15 +431,15 @@ def test_read_json(s3_base, s3so): def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns): source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc") fname = "test_orc_reader.orc" - bname = "orc" + bucket = "orc" expect = pa.orc.ORCFile(source_file).read().to_pandas() with open(source_file, "rb") as f: buffer = f.read() - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): got = cudf.read_orc( - f"s3://{bname}/{fname}", + f"s3://{bucket}/{fname}", columns=columns, storage_options=s3so, use_python_file_object=use_python_file_object, @@ -454,17 +454,17 @@ def test_read_orc(s3_base, s3so, datadir, use_python_file_object, columns): def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns): source_file = str(datadir / "orc" / "TestOrcFile.testSnappy.orc") fname = "test_orc_reader.orc" - bname = "orc" + bucket = "orc" expect = pa.orc.ORCFile(source_file).read().to_pandas() with open(source_file, "rb") as f: buffer = f.read() - with s3_context(s3_base=s3_base, bucket=bname, files={fname: buffer}): + with s3_context(s3_base=s3_base, bucket=bucket, files={fname: buffer}): fs = pa_fs.S3FileSystem( endpoint_override=s3so["client_kwargs"]["endpoint_url"], ) - with fs.open_input_file(f"{bname}/{fname}") as fil: + with fs.open_input_file(f"{bucket}/{fname}") as fil: got = cudf.read_orc(fil, columns=columns) if columns: @@ -474,13 +474,13 @@ def test_read_orc_arrow_nativefile(s3_base, s3so, datadir, columns): def test_write_orc(s3_base, s3so, pdf): fname = "test_orc_writer.orc" - bname = "orc" + bucket = "orc" gdf = cudf.from_pandas(pdf) - with s3_context(s3_base=s3_base, bucket=bname) as s3fs: - gdf.to_orc(f"s3://{bname}/{fname}", storage_options=s3so) - assert s3fs.exists(f"s3://{bname}/{fname}") + with s3_context(s3_base=s3_base, bucket=bucket) as s3fs: + gdf.to_orc(f"s3://{bucket}/{fname}", storage_options=s3so) + assert s3fs.exists(f"s3://{bucket}/{fname}") - with s3fs.open(f"s3://{bname}/{fname}") as f: + with s3fs.open(f"s3://{bucket}/{fname}") as f: got = pa.orc.ORCFile(f).read().to_pandas() assert_eq(pdf, got)