diff --git a/.github/workflows/python.yml b/.github/workflows/python.yml index c0582b4ee64..6b84ed2cdb8 100644 --- a/.github/workflows/python.yml +++ b/.github/workflows/python.yml @@ -50,7 +50,7 @@ jobs: - ubuntu-16.04-python-3 - conda-python-3.8-nopandas - conda-python-3.6-pandas-0.23 - - conda-python-3.6-pandas-latest + - conda-python-3.7-pandas-latest - centos-python-3.6-manylinux1 include: - name: ubuntu-16.04-python-3 @@ -70,11 +70,11 @@ jobs: title: AMD64 Conda Python 3.6 Pandas 0.23 python: 3.6 pandas: 0.23 - - name: conda-python-3.6-pandas-latest - cache: conda-python-3.6 + - name: conda-python-3.7-pandas-latest + cache: conda-python-3.7 image: conda-python-pandas - title: AMD64 Conda Python 3.6 Pandas latest - python: 3.6 + title: AMD64 Conda Python 3.7 Pandas latest + python: 3.7 pandas: latest - name: centos-python-3.6-manylinux1 cache: manylinux1 diff --git a/ci/conda_env_python.yml b/ci/conda_env_python.yml index f6c89923870..f2f46c84436 100644 --- a/ci/conda_env_python.yml +++ b/ci/conda_env_python.yml @@ -16,6 +16,7 @@ # under the License. # don't add pandas here, because it is not a mandatory test dependency +boto3 # not a direct dependency of s3fs, but needed for our s3fs fixture cffi cython cloudpickle @@ -26,5 +27,6 @@ pytest pytest-faulthandler pytest-lazy-fixture pytz +s3fs>=0.4 setuptools setuptools_scm diff --git a/python/pyarrow/tests/test_fs.py b/python/pyarrow/tests/test_fs.py index 08179d15927..4f926b4897c 100644 --- a/python/pyarrow/tests/test_fs.py +++ b/python/pyarrow/tests/test_fs.py @@ -16,6 +16,7 @@ # under the License. from datetime import datetime, timezone, timedelta +from distutils.version import LooseVersion import gzip import pathlib import pickle @@ -349,6 +350,8 @@ def py_fsspec_memoryfs(request, tempdir): @pytest.fixture def py_fsspec_s3fs(request, s3_connection, s3_server): s3fs = pytest.importorskip("s3fs") + if sys.version_info < (3, 7) and s3fs.__version__ >= LooseVersion("0.5"): + pytest.skip("s3fs>=0.5 version is async and requires Python >= 3.7") host, port, access_key, secret_key = s3_connection bucket = 'pyarrow-filesystem/' @@ -601,8 +604,6 @@ class Path: def test_get_file_info(fs, pathfn): - skip_fsspec_s3fs(fs) # s3fs doesn't create nested directories - aaa = pathfn('a/aa/aaa/') bb = pathfn('a/bb') c = pathfn('c.txt') @@ -619,6 +620,7 @@ def test_get_file_info(fs, pathfn): assert aaa_info.path == aaa assert 'aaa' in repr(aaa_info) assert aaa_info.extension == '' + assert aaa_info.type == FileType.Directory assert 'FileType.Directory' in repr(aaa_info) assert aaa_info.size is None check_mtime_or_absent(aaa_info) @@ -629,7 +631,7 @@ def test_get_file_info(fs, pathfn): assert bb_info.type == FileType.File assert 'FileType.File' in repr(bb_info) assert bb_info.size == 0 - if fs.type_name != "py::fsspec+memory": + if fs.type_name not in ["py::fsspec+memory", "py::fsspec+s3"]: check_mtime(bb_info) assert c_info.path == str(c) @@ -638,7 +640,7 @@ def test_get_file_info(fs, pathfn): assert c_info.type == FileType.File assert 'FileType.File' in repr(c_info) assert c_info.size == 4 - if fs.type_name != "py::fsspec+memory": + if fs.type_name not in ["py::fsspec+memory", "py::fsspec+s3"]: check_mtime(c_info) assert zzz_info.path == str(zzz) @@ -657,12 +659,12 @@ def test_get_file_info(fs, pathfn): def test_get_file_info_with_selector(fs, pathfn): - skip_fsspec_s3fs(fs) - base_dir = pathfn('selector-dir/') file_a = pathfn('selector-dir/test_file_a') file_b = pathfn('selector-dir/test_file_b') dir_a = pathfn('selector-dir/test_dir_a') + file_c = pathfn('selector-dir/test_dir_a/test_file_c') + dir_b = pathfn('selector-dir/test_dir_b') try: fs.create_dir(base_dir) @@ -671,34 +673,51 @@ def test_get_file_info_with_selector(fs, pathfn): with fs.open_output_stream(file_b): pass fs.create_dir(dir_a) + with fs.open_output_stream(file_c): + pass + fs.create_dir(dir_b) + # recursive selector selector = FileSelector(base_dir, allow_not_found=False, recursive=True) assert selector.base_dir == base_dir infos = fs.get_file_info(selector) - assert len(infos) == 3 + if fs.type_name == "py::fsspec+s3": + # s3fs only lists directories if they are not empty + assert len(infos) == 4 + else: + assert len(infos) == 5 for info in infos: - if info.path.endswith(file_a): - assert info.type == FileType.File - elif info.path.endswith(file_b): + if (info.path.endswith(file_a) or info.path.endswith(file_b) or + info.path.endswith(file_c)): assert info.type == FileType.File - elif info.path.rstrip("/").endswith(dir_a): + elif (info.path.rstrip("/").endswith(dir_a) or + info.path.rstrip("/").endswith(dir_b)): assert info.type == FileType.Directory else: raise ValueError('unexpected path {}'.format(info.path)) check_mtime_or_absent(info) + + # non-recursive selector -> not selecting the nested file_c + selector = FileSelector(base_dir, recursive=False) + + infos = fs.get_file_info(selector) + if fs.type_name == "py::fsspec+s3": + # s3fs only lists directories if they are not empty + assert len(infos) == 3 + else: + assert len(infos) == 4 + finally: - fs.delete_file(file_a) - fs.delete_file(file_b) - fs.delete_dir(dir_a) fs.delete_dir(base_dir) def test_create_dir(fs, pathfn): - skip_fsspec_s3fs(fs) # create_dir doesn't create dir, so delete dir fails - + # s3fs fails deleting dir fails if it is empty + # (https://github.com/dask/s3fs/issues/317) + skip_fsspec_s3fs(fs) d = pathfn('test-directory/') with pytest.raises(pa.ArrowIOError): diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index ae3dd0b3094..cf172589374 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -2190,10 +2190,38 @@ def s3_example_s3fs(s3_connection, s3_server, s3_bucket): pass +@parametrize_legacy_dataset +def test_read_s3fs(s3_example_s3fs, use_legacy_dataset): + fs, path = s3_example_s3fs + path = path + "/test.parquet" + table = pa.table({"a": [1, 2, 3]}) + _write_table(table, path, filesystem=fs) + + result = _read_table( + path, filesystem=fs, use_legacy_dataset=use_legacy_dataset + ) + assert result.equals(table) + + +@parametrize_legacy_dataset +def test_read_directory_s3fs(s3_example_s3fs, use_legacy_dataset): + fs, directory = s3_example_s3fs + path = directory + "/test.parquet" + table = pa.table({"a": [1, 2, 3]}) + _write_table(table, path, filesystem=fs) + + result = _read_table( + directory, filesystem=fs, use_legacy_dataset=use_legacy_dataset + ) + assert result.equals(table) + + @pytest.mark.pandas @pytest.mark.s3 @parametrize_legacy_dataset -def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset): +def test_read_partitioned_directory_s3fs_wrapper( + s3_example_s3fs, use_legacy_dataset +): from pyarrow.filesystem import S3FSWrapper fs, path = s3_example_s3fs @@ -2207,6 +2235,15 @@ def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset): dataset.read() +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset): + fs, path = s3_example_s3fs + _partition_test_for_filesystem( + fs, path, use_legacy_dataset=use_legacy_dataset + ) + + def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True): foo_keys = [0, 1] bar_keys = ['a', 'b', 'c'] @@ -2250,32 +2287,34 @@ def _generate_partition_directories(fs, base_dir, partition_spec, df): # part_table : a pyarrow.Table to write to each partition DEPTH = len(partition_spec) + pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/")) + def _visit_level(base_dir, level, part_keys): name, values = partition_spec[level] for value in values: this_part_keys = part_keys + [(name, value)] - level_dir = fs._path_join( + level_dir = pathsep.join([ str(base_dir), '{}={}'.format(name, value) - ) + ]) fs.mkdir(level_dir) if level == DEPTH - 1: # Generate example data - file_path = fs._path_join(level_dir, guid()) + file_path = pathsep.join([level_dir, guid()]) filtered_df = _filter_partition(df, this_part_keys) part_table = pa.Table.from_pandas(filtered_df) with fs.open(file_path, 'wb') as f: _write_table(part_table, f) assert fs.exists(file_path) - file_success = fs._path_join(level_dir, '_SUCCESS') + file_success = pathsep.join([level_dir, '_SUCCESS']) with fs.open(file_success, 'wb') as f: pass else: _visit_level(level_dir, level + 1, this_part_keys) - file_success = fs._path_join(level_dir, '_SUCCESS') + file_success = pathsep.join([level_dir, '_SUCCESS']) with fs.open(file_success, 'wb') as f: pass @@ -2969,22 +3008,45 @@ def test_write_to_dataset_pathlib(tempdir, use_legacy_dataset): tempdir / "test2", use_legacy_dataset) +# Those tests are failing - see ARROW-10370 +# @pytest.mark.pandas +# @pytest.mark.s3 +# @parametrize_legacy_dataset +# def test_write_to_dataset_pathlib_nonlocal( +# tempdir, s3_example_s3fs, use_legacy_dataset +# ): +# # pathlib paths are only accepted for local files +# fs, _ = s3_example_s3fs + +# with pytest.raises(TypeError, match="path-like objects are only allowed"): +# _test_write_to_dataset_with_partitions( +# tempdir / "test1", use_legacy_dataset, filesystem=fs) + +# with pytest.raises(TypeError, match="path-like objects are only allowed"): +# _test_write_to_dataset_no_partitions( +# tempdir / "test2", use_legacy_dataset, filesystem=fs) + + @pytest.mark.pandas -@pytest.mark.s3 @parametrize_legacy_dataset -def test_write_to_dataset_pathlib_nonlocal( - tempdir, s3_example_s3fs, use_legacy_dataset +def test_write_to_dataset_with_partitions_s3fs( + s3_example_s3fs, use_legacy_dataset ): - # pathlib paths are only accepted for local files - fs, _ = s3_example_s3fs + fs, path = s3_example_s3fs + + _test_write_to_dataset_with_partitions( + path, use_legacy_dataset, filesystem=fs) - with pytest.raises(TypeError, match="path-like objects are only allowed"): - _test_write_to_dataset_with_partitions( - tempdir / "test1", use_legacy_dataset, filesystem=fs) - with pytest.raises(TypeError, match="path-like objects are only allowed"): - _test_write_to_dataset_no_partitions( - tempdir / "test2", use_legacy_dataset, filesystem=fs) +@pytest.mark.pandas +@parametrize_legacy_dataset +def test_write_to_dataset_no_partitions_s3fs( + s3_example_s3fs, use_legacy_dataset +): + fs, path = s3_example_s3fs + + _test_write_to_dataset_no_partitions( + path, use_legacy_dataset, filesystem=fs) @pytest.mark.pandas @@ -3641,6 +3703,23 @@ def test_parquet_writer_filesystem_s3_uri(s3_example_fs): tm.assert_frame_equal(result, df) +@pytest.mark.pandas +def test_parquet_writer_filesystem_s3fs(s3_example_s3fs): + df = _test_dataframe(100) + table = pa.Table.from_pandas(df, preserve_index=False) + + fs, directory = s3_example_s3fs + path = directory + "/test.parquet" + + with pq.ParquetWriter( + path, table.schema, filesystem=fs, version='2.0' + ) as writer: + writer.write_table(table) + + result = _read_table(path, filesystem=fs).to_pandas() + tm.assert_frame_equal(result, df) + + @pytest.mark.pandas def test_parquet_writer_filesystem_buffer_raises(): df = _test_dataframe(100)