Skip to content
260 changes: 260 additions & 0 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@

import sys

import pyarrow as pa
from pyarrow.util import _stringify_path, _is_path_like


if sys.version_info < (3,):
raise ImportError("Python Dataset bindings require Python 3")

Expand Down Expand Up @@ -53,3 +57,259 @@
Source,
TreeSource,
)


def partitioning(field_names=None, flavor=None):
"""
Specify a partitioning scheme.

The supported schemes include:

- "DirectoryPartitioning": this scheme expects one segment in the file path
for each field in the specified schema (all fields are required to be
present). For example given schema<year:int16, month:int8> the path
"/2009/11" would be parsed to ("year"_ == 2009 and "month"_ == 11).
- "HivePartitioning": a scheme for "/$key=$value/" nested directories as
found in Apache Hive. This is a multi-level, directory based partitioning
scheme. Data is partitioned by static values of a particular column in
the schema. Partition keys are represented in the form $key=$value in
directory names. Field order is ignored, as are missing or unrecognized
field names.
For example, given schema<year:int16, month:int8, day:int8>, a possible
path would be "/year=2009/month=11/day=15" (but the field order does not
need to match).

Parameters
----------
field_names : pyarrow.Schema or list of str
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think an argument named field_names should accept a schema. Please move it to a separate argument or maybe rename to schema_or_field_names

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree. @kszucs combined them in his protoype. @kszucs would you be fine with having separate keyword arguments for schema and field_names ?

The schema that describes the partitions present in the file path. If
a list of strings (field names) is passed, the schema's types are
inferred from the file paths (only valid for DirectoryPartitioning).
flavor : str, default None
The default is DirectoryPartitioning. Specify ``flavor="hive"`` for
a HivePartitioning.

Returns
-------
Partitioning or PartitioningFactory

Examples
--------

Specify the Schema for paths like "/2009/June":

>>> partitioning(pa.schema([("year", pa.int16()), ("month", pa.string())]))

or let the types be inferred by only specifying the field names:

>>> partitioning(["year", "month"])

For paths like "/2009/June", the year will be inferred as int32 while month
will be inferred as string.

Create a Hive scheme for a path like "/year=2009/month=11":

>>> partitioning(
... pa.schema([("year", pa.int16()), ("month", pa.int8())]),
... flavor="hive")

A Hive scheme can also be discovered from the directory structure (and
types will be inferred):

>>> partitioning(flavor="hive")

"""
if flavor is None:
# default flavor
if isinstance(field_names, pa.Schema):
return DirectoryPartitioning(field_names)
elif isinstance(field_names, list):
return DirectoryPartitioning.discover(field_names)
elif field_names is None:
raise ValueError(
"For the default directory flavor, need to specify "
"'field_names' as Schema or list of field names")
else:
raise ValueError(
"Expected Schema or list of field names, got {0}".format(
type(field_names)))
elif flavor == 'hive':
if isinstance(field_names, pa.Schema):
return HivePartitioning(field_names)
elif field_names is None:
return HivePartitioning.discover()
else:
raise ValueError(
"Expected Schema or None for 'field_names', got {0}".format(
type(field_names)))
else:
raise ValueError("Unsupported flavor")


def _ensure_fs_and_paths(path_or_paths, filesystem=None):
# Validate and convert the path-likes and filesystem.
# Returns filesystem and list of string paths or FileSelector
from pyarrow.fs import FileSystem, FileType, FileSelector

if isinstance(path_or_paths, list):
paths_or_selector = [_stringify_path(path) for path in path_or_paths]
if filesystem is None:
# infer from first path
filesystem, _ = FileSystem.from_uri(paths_or_selector[0])
else:
path = _stringify_path(path_or_paths)
if filesystem is None:
filesystem, path = FileSystem.from_uri(path)

stats = filesystem.get_target_stats([path])[0]
if stats.type == FileType.Directory:
# for directory, pass a selector
paths_or_selector = FileSelector(path, recursive=True)
elif stats.type == FileType.File:
# for a single file path, pass it as a list
paths_or_selector = [path]
else:
raise FileNotFoundError(path)

return filesystem, paths_or_selector


def _ensure_partitioning(scheme):
# Validate input and return a Partitioning(Factory) or passthrough None
# for no partitioning
if scheme is None:
pass
elif isinstance(scheme, str):
scheme = partitioning(flavor=scheme)
elif isinstance(scheme, (Partitioning, PartitioningFactory)):
pass
else:
ValueError(
"Expected Partitioning or PartitioningFactory, got {0}".format(
type(scheme)))
return scheme


def source(path_or_paths, filesystem=None, partitioning=None,
format=None):
"""
Open a (multi-file) data source.

Parameters
----------
path_or_paths : str, pathlib.Path, or list of those
Path to a file or to a directory containing the data files, or
a list of paths.
filesystem : FileSystem, default None
By default will be inferred from the path.
partitioning : Partitioning(Factory) or str
The partitioning scheme specified with the ``partitioning()``
function. A flavor string can be used as shortcut.
format : str
Currently only "parquet" is supported.

Returns
-------
DataSource of DataSourceDiscovery

"""
filesystem, paths_or_selector = _ensure_fs_and_paths(
path_or_paths, filesystem)

partitioning = _ensure_partitioning(partitioning)

format = format or "parquet"
if format == "parquet":
format = ParquetFileFormat()
elif not isinstance(format, FileFormat):
raise ValueError("format '{0}' is not supported".format(format))

# TODO pass through options
options = FileSystemFactoryOptions()

if isinstance(partitioning, PartitioningFactory):
options.partitioning_factory = partitioning
elif isinstance(partitioning, Partitioning):
options.partitioning = partitioning

discovery = FileSystemSourceFactory(
filesystem, paths_or_selector, format, options)

# TODO return Source if a specific schema was passed?

# need to return SourceFactory since `dataset` might need to
# finish the factory with a unified schema
return discovery


def _ensure_source(src, filesystem=None, partitioning=None, format=None):
if _is_path_like(src):
src = source(src, filesystem=filesystem,
partitioning=partitioning, format=format)
# TODO also accept Source?
elif isinstance(src, FileSystemSourceFactory):
# when passing a SourceFactory, the arguments cannot be specified
if any(kwarg is not None
for kwarg in [filesystem, partitioning, format]):
raise ValueError(
"When passing a Source(Factory), you cannot pass any "
"additional arguments")
else:
raise ValueError("Expected a path-like or Source, got {0}".format(
type(src)))
return src


def dataset(sources, filesystem=None, partitioning=None, format=None):
"""
Open a (multi-source) dataset.

Parameters
----------
sources : path or list of paths or sources
Path to a file or to a directory containing the data files, or a list
of paths for a multi-source dataset. To have more control, a list of
sources can be passed, created with the ``source()`` function (in this
case, the additional keywords will be ignored).
filesystem : FileSystem, default None
By default will be inferred from the path.
partitioning : Partitioning(Factory) or str
The partitioning scheme specified with the ``partitioning()``
function. A flavor string can be used as shortcut.
format : str
Currently only "parquet" is supported.

Returns
-------
Dataset

Examples
--------
Opening a dataset for a single directory:

>>> dataset("path/to/nyc-taxi/", format="parquet")

Combining different sources:

>>> dataset([
... source("s3://old-taxi-data", format="parquet"),
... source("local/path/to/new/data", format="csv")
... ])

"""
if not isinstance(sources, list):
sources = [sources]
sources = [
_ensure_source(src, filesystem=filesystem,
partitioning=partitioning, format=format)
for src in sources
]

# TEMP: for now only deal with a single source

if len(sources) > 1:
raise NotImplementedError("only a single source is supported for now")

discovery = sources[0]
inspected_schema = discovery.inspect()
return Dataset([discovery.finish()], inspected_schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder how should we express the support for not just one, but multiple data sources.
For example opening a dataset with a source from s3 and a local one.

open_dataset(['s3://bucket/base/path', 'local://base/path'])

We'll probably need more functions, but cannot think a meaningful name right now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or in this case we'll simply require to construct the data sources and the dataset manually?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally think this advanced use case should not necessarily be handled by such a higher level function (not sure how the dataset discovery in C++ will handle this). So that it is indeed up to the user to construct the data sources manually

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Joris.

Loading