Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .github/workflows/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
- ubuntu-16.04-python-3
- conda-python-3.8-nopandas
- conda-python-3.6-pandas-0.23
- conda-python-3.6-pandas-latest
- conda-python-3.7-pandas-latest
- centos-python-3.6-manylinux1
include:
- name: ubuntu-16.04-python-3
Expand All @@ -70,11 +70,11 @@ jobs:
title: AMD64 Conda Python 3.6 Pandas 0.23
python: 3.6
pandas: 0.23
- name: conda-python-3.6-pandas-latest
cache: conda-python-3.6
- name: conda-python-3.7-pandas-latest
cache: conda-python-3.7
image: conda-python-pandas
title: AMD64 Conda Python 3.6 Pandas latest
python: 3.6
title: AMD64 Conda Python 3.7 Pandas latest
python: 3.7
Copy link
Member Author

@jorisvandenbossche jorisvandenbossche Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kszucs is it fine to simply change this like the above? Or does something else need to be updated as well?

CI for this build seems to pass, but the "Python / docker" one fails (this is fixed now)

(I am changing "pandas-latest" to use 3.7, as right now we have many builds using 3.6, and almost none using 3.7, eg the pandas 0.23 above also uses python 3.6)

Copy link
Member

@kszucs kszucs Nov 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, the parametrization is right.

pandas: latest
- name: centos-python-3.6-manylinux1
cache: manylinux1
Expand Down
2 changes: 2 additions & 0 deletions ci/conda_env_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

# don't add pandas here, because it is not a mandatory test dependency
boto3 # not a direct dependency of s3fs, but needed for our s3fs fixture
cffi
cython
cloudpickle
Expand All @@ -26,5 +27,6 @@ pytest
pytest-faulthandler
pytest-lazy-fixture
pytz
s3fs>=0.4
setuptools
setuptools_scm
51 changes: 35 additions & 16 deletions python/pyarrow/tests/test_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.

from datetime import datetime, timezone, timedelta
from distutils.version import LooseVersion
import gzip
import pathlib
import pickle
Expand Down Expand Up @@ -349,6 +350,8 @@ def py_fsspec_memoryfs(request, tempdir):
@pytest.fixture
def py_fsspec_s3fs(request, s3_connection, s3_server):
s3fs = pytest.importorskip("s3fs")
if sys.version_info < (3, 7) and s3fs.__version__ >= LooseVersion("0.5"):
pytest.skip("s3fs>=0.5 version is async and requires Python >= 3.7")

host, port, access_key, secret_key = s3_connection
bucket = 'pyarrow-filesystem/'
Expand Down Expand Up @@ -601,8 +604,6 @@ class Path:


def test_get_file_info(fs, pathfn):
skip_fsspec_s3fs(fs) # s3fs doesn't create nested directories

aaa = pathfn('a/aa/aaa/')
bb = pathfn('a/bb')
c = pathfn('c.txt')
Expand All @@ -619,6 +620,7 @@ def test_get_file_info(fs, pathfn):
assert aaa_info.path == aaa
assert 'aaa' in repr(aaa_info)
assert aaa_info.extension == ''
assert aaa_info.type == FileType.Directory
assert 'FileType.Directory' in repr(aaa_info)
assert aaa_info.size is None
check_mtime_or_absent(aaa_info)
Expand All @@ -629,7 +631,7 @@ def test_get_file_info(fs, pathfn):
assert bb_info.type == FileType.File
assert 'FileType.File' in repr(bb_info)
assert bb_info.size == 0
if fs.type_name != "py::fsspec+memory":
if fs.type_name not in ["py::fsspec+memory", "py::fsspec+s3"]:
check_mtime(bb_info)

assert c_info.path == str(c)
Expand All @@ -638,7 +640,7 @@ def test_get_file_info(fs, pathfn):
assert c_info.type == FileType.File
assert 'FileType.File' in repr(c_info)
assert c_info.size == 4
if fs.type_name != "py::fsspec+memory":
if fs.type_name not in ["py::fsspec+memory", "py::fsspec+s3"]:
check_mtime(c_info)

assert zzz_info.path == str(zzz)
Expand All @@ -657,12 +659,12 @@ def test_get_file_info(fs, pathfn):


def test_get_file_info_with_selector(fs, pathfn):
skip_fsspec_s3fs(fs)

base_dir = pathfn('selector-dir/')
file_a = pathfn('selector-dir/test_file_a')
file_b = pathfn('selector-dir/test_file_b')
dir_a = pathfn('selector-dir/test_dir_a')
file_c = pathfn('selector-dir/test_dir_a/test_file_c')
dir_b = pathfn('selector-dir/test_dir_b')

try:
fs.create_dir(base_dir)
Expand All @@ -671,34 +673,51 @@ def test_get_file_info_with_selector(fs, pathfn):
with fs.open_output_stream(file_b):
pass
fs.create_dir(dir_a)
with fs.open_output_stream(file_c):
pass
fs.create_dir(dir_b)

# recursive selector
selector = FileSelector(base_dir, allow_not_found=False,
recursive=True)
assert selector.base_dir == base_dir

infos = fs.get_file_info(selector)
assert len(infos) == 3
if fs.type_name == "py::fsspec+s3":
# s3fs only lists directories if they are not empty
assert len(infos) == 4
else:
assert len(infos) == 5

for info in infos:
if info.path.endswith(file_a):
assert info.type == FileType.File
elif info.path.endswith(file_b):
if (info.path.endswith(file_a) or info.path.endswith(file_b) or
info.path.endswith(file_c)):
assert info.type == FileType.File
elif info.path.rstrip("/").endswith(dir_a):
elif (info.path.rstrip("/").endswith(dir_a) or
info.path.rstrip("/").endswith(dir_b)):
assert info.type == FileType.Directory
else:
raise ValueError('unexpected path {}'.format(info.path))
check_mtime_or_absent(info)

# non-recursive selector -> not selecting the nested file_c
selector = FileSelector(base_dir, recursive=False)

infos = fs.get_file_info(selector)
if fs.type_name == "py::fsspec+s3":
# s3fs only lists directories if they are not empty
assert len(infos) == 3
else:
assert len(infos) == 4

finally:
fs.delete_file(file_a)
fs.delete_file(file_b)
fs.delete_dir(dir_a)
fs.delete_dir(base_dir)


def test_create_dir(fs, pathfn):
skip_fsspec_s3fs(fs) # create_dir doesn't create dir, so delete dir fails

# s3fs fails deleting dir fails if it is empty
# (https://github.com/dask/s3fs/issues/317)
skip_fsspec_s3fs(fs)
d = pathfn('test-directory/')

with pytest.raises(pa.ArrowIOError):
Expand Down
113 changes: 96 additions & 17 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2190,10 +2190,38 @@ def s3_example_s3fs(s3_connection, s3_server, s3_bucket):
pass


@parametrize_legacy_dataset
def test_read_s3fs(s3_example_s3fs, use_legacy_dataset):
fs, path = s3_example_s3fs
path = path + "/test.parquet"
table = pa.table({"a": [1, 2, 3]})
_write_table(table, path, filesystem=fs)

result = _read_table(
path, filesystem=fs, use_legacy_dataset=use_legacy_dataset
)
assert result.equals(table)


@parametrize_legacy_dataset
def test_read_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
fs, directory = s3_example_s3fs
path = directory + "/test.parquet"
table = pa.table({"a": [1, 2, 3]})
_write_table(table, path, filesystem=fs)

result = _read_table(
directory, filesystem=fs, use_legacy_dataset=use_legacy_dataset
)
assert result.equals(table)


@pytest.mark.pandas
@pytest.mark.s3
@parametrize_legacy_dataset
def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
def test_read_partitioned_directory_s3fs_wrapper(
s3_example_s3fs, use_legacy_dataset
):
from pyarrow.filesystem import S3FSWrapper

fs, path = s3_example_s3fs
Expand All @@ -2207,6 +2235,15 @@ def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
dataset.read()


@pytest.mark.pandas
@parametrize_legacy_dataset
def test_read_partitioned_directory_s3fs(s3_example_s3fs, use_legacy_dataset):
fs, path = s3_example_s3fs
_partition_test_for_filesystem(
fs, path, use_legacy_dataset=use_legacy_dataset
)


def _partition_test_for_filesystem(fs, base_path, use_legacy_dataset=True):
foo_keys = [0, 1]
bar_keys = ['a', 'b', 'c']
Expand Down Expand Up @@ -2250,32 +2287,34 @@ def _generate_partition_directories(fs, base_dir, partition_spec, df):
# part_table : a pyarrow.Table to write to each partition
DEPTH = len(partition_spec)

pathsep = getattr(fs, "pathsep", getattr(fs, "sep", "/"))

def _visit_level(base_dir, level, part_keys):
name, values = partition_spec[level]
for value in values:
this_part_keys = part_keys + [(name, value)]

level_dir = fs._path_join(
level_dir = pathsep.join([
str(base_dir),
'{}={}'.format(name, value)
)
])
fs.mkdir(level_dir)

if level == DEPTH - 1:
# Generate example data
file_path = fs._path_join(level_dir, guid())
file_path = pathsep.join([level_dir, guid()])
filtered_df = _filter_partition(df, this_part_keys)
part_table = pa.Table.from_pandas(filtered_df)
with fs.open(file_path, 'wb') as f:
_write_table(part_table, f)
assert fs.exists(file_path)

file_success = fs._path_join(level_dir, '_SUCCESS')
file_success = pathsep.join([level_dir, '_SUCCESS'])
with fs.open(file_success, 'wb') as f:
pass
else:
_visit_level(level_dir, level + 1, this_part_keys)
file_success = fs._path_join(level_dir, '_SUCCESS')
file_success = pathsep.join([level_dir, '_SUCCESS'])
with fs.open(file_success, 'wb') as f:
pass

Expand Down Expand Up @@ -2969,22 +3008,45 @@ def test_write_to_dataset_pathlib(tempdir, use_legacy_dataset):
tempdir / "test2", use_legacy_dataset)


# Those tests are failing - see ARROW-10370
# @pytest.mark.pandas
# @pytest.mark.s3
# @parametrize_legacy_dataset
# def test_write_to_dataset_pathlib_nonlocal(
# tempdir, s3_example_s3fs, use_legacy_dataset
# ):
# # pathlib paths are only accepted for local files
# fs, _ = s3_example_s3fs

# with pytest.raises(TypeError, match="path-like objects are only allowed"):
# _test_write_to_dataset_with_partitions(
# tempdir / "test1", use_legacy_dataset, filesystem=fs)

# with pytest.raises(TypeError, match="path-like objects are only allowed"):
# _test_write_to_dataset_no_partitions(
# tempdir / "test2", use_legacy_dataset, filesystem=fs)


@pytest.mark.pandas
@pytest.mark.s3
@parametrize_legacy_dataset
def test_write_to_dataset_pathlib_nonlocal(
tempdir, s3_example_s3fs, use_legacy_dataset
def test_write_to_dataset_with_partitions_s3fs(
s3_example_s3fs, use_legacy_dataset
):
# pathlib paths are only accepted for local files
fs, _ = s3_example_s3fs
fs, path = s3_example_s3fs

_test_write_to_dataset_with_partitions(
path, use_legacy_dataset, filesystem=fs)

with pytest.raises(TypeError, match="path-like objects are only allowed"):
_test_write_to_dataset_with_partitions(
tempdir / "test1", use_legacy_dataset, filesystem=fs)

with pytest.raises(TypeError, match="path-like objects are only allowed"):
_test_write_to_dataset_no_partitions(
tempdir / "test2", use_legacy_dataset, filesystem=fs)
@pytest.mark.pandas
@parametrize_legacy_dataset
def test_write_to_dataset_no_partitions_s3fs(
s3_example_s3fs, use_legacy_dataset
):
fs, path = s3_example_s3fs

_test_write_to_dataset_no_partitions(
path, use_legacy_dataset, filesystem=fs)


@pytest.mark.pandas
Expand Down Expand Up @@ -3641,6 +3703,23 @@ def test_parquet_writer_filesystem_s3_uri(s3_example_fs):
tm.assert_frame_equal(result, df)


@pytest.mark.pandas
def test_parquet_writer_filesystem_s3fs(s3_example_s3fs):
df = _test_dataframe(100)
table = pa.Table.from_pandas(df, preserve_index=False)

fs, directory = s3_example_s3fs
path = directory + "/test.parquet"

with pq.ParquetWriter(
path, table.schema, filesystem=fs, version='2.0'
) as writer:
writer.write_table(table)

result = _read_table(path, filesystem=fs).to_pandas()
tm.assert_frame_equal(result, df)


@pytest.mark.pandas
def test_parquet_writer_filesystem_buffer_raises():
df = _test_dataframe(100)
Expand Down