diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 788777ebc48..f7a24c82876 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -21,6 +21,10 @@ import sys +import pyarrow as pa +from pyarrow.util import _stringify_path, _is_path_like + + if sys.version_info < (3,): raise ImportError("Python Dataset bindings require Python 3") @@ -53,3 +57,259 @@ Source, TreeSource, ) + + +def partitioning(field_names=None, flavor=None): + """ + Specify a partitioning scheme. + + The supported schemes include: + + - "DirectoryPartitioning": this scheme expects one segment in the file path + for each field in the specified schema (all fields are required to be + present). For example given schema the path + "/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11). + - "HivePartitioning": a scheme for "/$key=$value/" nested directories as + found in Apache Hive. This is a multi-level, directory based partitioning + scheme. Data is partitioned by static values of a particular column in + the schema. Partition keys are represented in the form $key=$value in + directory names. Field order is ignored, as are missing or unrecognized + field names. + For example, given schema, a possible + path would be "/year=2009/month=11/day=15" (but the field order does not + need to match). + + Parameters + ---------- + field_names : pyarrow.Schema or list of str + The schema that describes the partitions present in the file path. If + a list of strings (field names) is passed, the schema's types are + inferred from the file paths (only valid for DirectoryPartitioning). + flavor : str, default None + The default is DirectoryPartitioning. Specify ``flavor="hive"`` for + a HivePartitioning. + + Returns + ------- + Partitioning or PartitioningFactory + + Examples + -------- + + Specify the Schema for paths like "/2009/June": + + >>> partitioning(pa.schema([("year", pa.int16()), ("month", pa.string())])) + + or let the types be inferred by only specifying the field names: + + >>> partitioning(["year", "month"]) + + For paths like "/2009/June", the year will be inferred as int32 while month + will be inferred as string. + + Create a Hive scheme for a path like "/year=2009/month=11": + + >>> partitioning( + ... pa.schema([("year", pa.int16()), ("month", pa.int8())]), + ... flavor="hive") + + A Hive scheme can also be discovered from the directory structure (and + types will be inferred): + + >>> partitioning(flavor="hive") + + """ + if flavor is None: + # default flavor + if isinstance(field_names, pa.Schema): + return DirectoryPartitioning(field_names) + elif isinstance(field_names, list): + return DirectoryPartitioning.discover(field_names) + elif field_names is None: + raise ValueError( + "For the default directory flavor, need to specify " + "'field_names' as Schema or list of field names") + else: + raise ValueError( + "Expected Schema or list of field names, got {0}".format( + type(field_names))) + elif flavor == 'hive': + if isinstance(field_names, pa.Schema): + return HivePartitioning(field_names) + elif field_names is None: + return HivePartitioning.discover() + else: + raise ValueError( + "Expected Schema or None for 'field_names', got {0}".format( + type(field_names))) + else: + raise ValueError("Unsupported flavor") + + +def _ensure_fs_and_paths(path_or_paths, filesystem=None): + # Validate and convert the path-likes and filesystem. + # Returns filesystem and list of string paths or FileSelector + from pyarrow.fs import FileSystem, FileType, FileSelector + + if isinstance(path_or_paths, list): + paths_or_selector = [_stringify_path(path) for path in path_or_paths] + if filesystem is None: + # infer from first path + filesystem, _ = FileSystem.from_uri(paths_or_selector[0]) + else: + path = _stringify_path(path_or_paths) + if filesystem is None: + filesystem, path = FileSystem.from_uri(path) + + stats = filesystem.get_target_stats([path])[0] + if stats.type == FileType.Directory: + # for directory, pass a selector + paths_or_selector = FileSelector(path, recursive=True) + elif stats.type == FileType.File: + # for a single file path, pass it as a list + paths_or_selector = [path] + else: + raise FileNotFoundError(path) + + return filesystem, paths_or_selector + + +def _ensure_partitioning(scheme): + # Validate input and return a Partitioning(Factory) or passthrough None + # for no partitioning + if scheme is None: + pass + elif isinstance(scheme, str): + scheme = partitioning(flavor=scheme) + elif isinstance(scheme, (Partitioning, PartitioningFactory)): + pass + else: + ValueError( + "Expected Partitioning or PartitioningFactory, got {0}".format( + type(scheme))) + return scheme + + +def source(path_or_paths, filesystem=None, partitioning=None, + format=None): + """ + Open a (multi-file) data source. + + Parameters + ---------- + path_or_paths : str, pathlib.Path, or list of those + Path to a file or to a directory containing the data files, or + a list of paths. + filesystem : FileSystem, default None + By default will be inferred from the path. + partitioning : Partitioning(Factory) or str + The partitioning scheme specified with the ``partitioning()`` + function. A flavor string can be used as shortcut. + format : str + Currently only "parquet" is supported. + + Returns + ------- + DataSource of DataSourceDiscovery + + """ + filesystem, paths_or_selector = _ensure_fs_and_paths( + path_or_paths, filesystem) + + partitioning = _ensure_partitioning(partitioning) + + format = format or "parquet" + if format == "parquet": + format = ParquetFileFormat() + elif not isinstance(format, FileFormat): + raise ValueError("format '{0}' is not supported".format(format)) + + # TODO pass through options + options = FileSystemFactoryOptions() + + if isinstance(partitioning, PartitioningFactory): + options.partitioning_factory = partitioning + elif isinstance(partitioning, Partitioning): + options.partitioning = partitioning + + discovery = FileSystemSourceFactory( + filesystem, paths_or_selector, format, options) + + # TODO return Source if a specific schema was passed? + + # need to return SourceFactory since `dataset` might need to + # finish the factory with a unified schema + return discovery + + +def _ensure_source(src, filesystem=None, partitioning=None, format=None): + if _is_path_like(src): + src = source(src, filesystem=filesystem, + partitioning=partitioning, format=format) + # TODO also accept Source? + elif isinstance(src, FileSystemSourceFactory): + # when passing a SourceFactory, the arguments cannot be specified + if any(kwarg is not None + for kwarg in [filesystem, partitioning, format]): + raise ValueError( + "When passing a Source(Factory), you cannot pass any " + "additional arguments") + else: + raise ValueError("Expected a path-like or Source, got {0}".format( + type(src))) + return src + + +def dataset(sources, filesystem=None, partitioning=None, format=None): + """ + Open a (multi-source) dataset. + + Parameters + ---------- + sources : path or list of paths or sources + Path to a file or to a directory containing the data files, or a list + of paths for a multi-source dataset. To have more control, a list of + sources can be passed, created with the ``source()`` function (in this + case, the additional keywords will be ignored). + filesystem : FileSystem, default None + By default will be inferred from the path. + partitioning : Partitioning(Factory) or str + The partitioning scheme specified with the ``partitioning()`` + function. A flavor string can be used as shortcut. + format : str + Currently only "parquet" is supported. + + Returns + ------- + Dataset + + Examples + -------- + Opening a dataset for a single directory: + + >>> dataset("path/to/nyc-taxi/", format="parquet") + + Combining different sources: + + >>> dataset([ + ... source("s3://old-taxi-data", format="parquet"), + ... source("local/path/to/new/data", format="csv") + ... ]) + + """ + if not isinstance(sources, list): + sources = [sources] + sources = [ + _ensure_source(src, filesystem=filesystem, + partitioning=partitioning, format=format) + for src in sources + ] + + # TEMP: for now only deal with a single source + + if len(sources) > 1: + raise NotImplementedError("only a single source is supported for now") + + discovery = sources[0] + inspected_schema = discovery.inspect() + return Dataset([discovery.finish()], inspected_schema) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index b0c8cae8ebd..976c29171cf 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -15,6 +15,8 @@ # specific language governing permissions and limitations # under the License. +import numpy as np + import pytest import pyarrow as pa @@ -379,7 +381,181 @@ def test_paritioning_factory(mockfs): ("group", pa.int32()), ("key", pa.string()), ]) - assert inspected_schema.remove_metadata().equals(expected_schema) + assert inspected_schema.equals(expected_schema, check_metadata=False) hive_partitioning_factory = ds.HivePartitioning.discover() assert isinstance(hive_partitioning_factory, ds.PartitioningFactory) + + +def test_partitioning_function(): + schema = pa.schema([("year", pa.int16()), ("month", pa.int8())]) + names = ["year", "month"] + + # default DirectoryPartitioning + + part = ds.partitioning(schema) + assert isinstance(part, ds.DirectoryPartitioning) + part = ds.partitioning(names) + assert isinstance(part, ds.PartitioningFactory) + # needs schema or names + with pytest.raises(ValueError): + ds.partitioning() + + # Hive partitioning + + part = ds.partitioning(schema, flavor="hive") + assert isinstance(part, ds.HivePartitioning) + part = ds.partitioning(flavor="hive") + assert isinstance(part, ds.PartitioningFactory) + # cannot pass list of names + with pytest.raises(ValueError): + ds.partitioning(names, flavor="hive") + + # unsupported flavor + with pytest.raises(ValueError): + ds.partitioning(schema, flavor="unsupported") + + +def _create_single_file(base_dir): + import pyarrow.parquet as pq + table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) + path = base_dir / "test.parquet" + pq.write_table(table, path) + return table, path + + +def _create_directory_of_files(base_dir): + import pyarrow.parquet as pq + table1 = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) + path1 = base_dir / "test1.parquet" + pq.write_table(table1, path1) + table2 = pa.table({'a': range(9, 18), 'b': [0.] * 4 + [1.] * 5}) + path2 = base_dir / "test2.parquet" + pq.write_table(table2, path2) + return (table1, table2), (path1, path2) + + +def _check_dataset_from_path(path, table, **kwargs): + import pathlib + + # pathlib object + assert isinstance(path, pathlib.Path) + dataset = ds.dataset(ds.source(path, **kwargs)) + assert dataset.schema.equals(table.schema, check_metadata=False) + result = dataset.new_scan().finish().to_table() + assert result.replace_schema_metadata().equals(table) + + # string path + dataset = ds.dataset(ds.source(str(path), **kwargs)) + assert dataset.schema.equals(table.schema, check_metadata=False) + result = dataset.new_scan().finish().to_table() + assert result.replace_schema_metadata().equals(table) + + # passing directly to dataset + dataset = ds.dataset(str(path), **kwargs) + assert dataset.schema.equals(table.schema, check_metadata=False) + result = dataset.new_scan().finish().to_table() + assert result.replace_schema_metadata().equals(table) + + +@pytest.mark.parquet +def test_open_dataset_single_file(tempdir): + table, path = _create_single_file(tempdir) + _check_dataset_from_path(path, table) + + +@pytest.mark.parquet +def test_open_dataset_directory(tempdir): + tables, _ = _create_directory_of_files(tempdir) + table = pa.concat_tables(tables) + _check_dataset_from_path(tempdir, table) + + +@pytest.mark.parquet +def test_open_dataset_list_of_files(tempdir): + tables, (path1, path2) = _create_directory_of_files(tempdir) + table = pa.concat_tables(tables) + + # list of exact files needs to be passed to source() function + # (dataset() will interpret it as separate sources) + for dataset in [ + ds.dataset(ds.source([path1, path2])), + ds.dataset(ds.source([str(path1), str(path2)]))]: + assert dataset.schema.equals(table.schema, check_metadata=False) + result = dataset.new_scan().finish().to_table() + assert result.replace_schema_metadata().equals(table) + + +@pytest.mark.parquet +def test_open_dataset_partitioned_directory(tempdir): + import pyarrow.parquet as pq + table = pa.table({'a': range(9), 'b': [0.] * 4 + [1.] * 5}) + for part in range(3): + path = tempdir / "part={0}".format(part) + path.mkdir() + pq.write_table(table, path / "test.parquet") + + # no partitioning specified, just read all individual files + full_table = pa.concat_tables([table] * 3) + _check_dataset_from_path(tempdir, full_table) + + # specify partition scheme with discovery + dataset = ds.dataset( + str(tempdir), partitioning=ds.partitioning(flavor="hive")) + expected_schema = table.schema.append(pa.field("part", pa.int32())) + assert dataset.schema.equals(expected_schema, check_metadata=False) + + # specify partition scheme with string short-cut + dataset = ds.dataset(str(tempdir), partitioning="hive") + assert dataset.schema.equals(expected_schema, check_metadata=False) + + # specify partition scheme with explicit scheme + dataset = ds.dataset( + str(tempdir), + partitioning=ds.partitioning( + pa.schema([("part", pa.int8())]), flavor="hive")) + expected_schema = table.schema.append(pa.field("part", pa.int8())) + assert dataset.schema.equals(expected_schema, check_metadata=False) + + result = dataset.new_scan().finish().to_table() + expected = full_table.append_column( + "part", pa.array(np.repeat([0, 1, 2], 9), type=pa.int8())) + assert result.replace_schema_metadata().equals(expected) + + +@pytest.mark.parquet +def test_open_dataset_filesystem(tempdir): + # # single file + table, path = _create_single_file(tempdir) + + # filesystem inferred from path + dataset1 = ds.dataset(str(path)) + assert dataset1.schema.equals(table.schema, check_metadata=False) + + # filesystem specified + dataset2 = ds.dataset(str(path), filesystem=fs.LocalFileSystem()) + assert dataset2.schema.equals(table.schema, check_metadata=False) + + # passing different filesystem + with pytest.raises(FileNotFoundError): + ds.dataset(str(path), filesystem=fs._MockFileSystem()) + + +def test_open_dataset_unsupported_format(tempdir): + _, path = _create_single_file(tempdir) + with pytest.raises(ValueError, match="format 'blabla' is not supported"): + ds.dataset([path], format="blabla") + + +def test_open_dataset_from_source_additional_kwargs(tempdir): + _, path = _create_single_file(tempdir) + with pytest.raises(ValueError, match="cannot pass any additional"): + ds.dataset(ds.source(path), format="parquet") + + +def test_open_dataset_validate_sources(tempdir): + _, path = _create_single_file(tempdir) + dataset = ds.dataset(path) + with pytest.raises(ValueError, + match="Expected a path-like or Source, got"): + ds.dataset([dataset])