diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index a02dd56c9b1..e75930a1bc4 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -27,6 +27,7 @@ import os import re import six +import operator import pyarrow as pa import pyarrow.lib as lib @@ -142,15 +143,17 @@ def _build_nested_paths(self): result = defaultdict(list) - def _visit_piece(i, key, rest): - result[key].append(i) + for i, path in enumerate(paths): + key = path[0] + rest = path[1:] + while True: + result[key].append(i) - if len(rest) > 0: - nested_key = '.'.join((key, rest[0])) - _visit_piece(i, nested_key, rest[1:]) + if not rest: + break - for i, path in enumerate(paths): - _visit_piece(i, path[0], path[1:]) + key = '.'.join((key, rest[0])) + rest = rest[1:] return result @@ -922,6 +925,11 @@ def _path_split(path, sep): EXCLUDED_PARQUET_PATHS = {'_SUCCESS'} +class _ParquetDatasetMetadata: + __slots__ = ('fs', 'memory_map', 'read_dictionary', 'common_metadata', + 'buffer_size') + + def _open_dataset_file(dataset, path, meta=None): if dataset.fs is not None and not isinstance(dataset.fs, LocalFileSystem): path = dataset.fs.open(path, mode='rb') @@ -999,32 +1007,37 @@ def __init__(self, path_or_paths, filesystem=None, schema=None, metadata=None, split_row_groups=False, validate_schema=True, filters=None, metadata_nthreads=1, read_dictionary=None, memory_map=False, buffer_size=0): + self._metadata = _ParquetDatasetMetadata() a_path = path_or_paths if isinstance(a_path, list): a_path = a_path[0] - self.fs, _ = _get_filesystem_and_path(filesystem, a_path) + self._metadata.fs, _ = _get_filesystem_and_path(filesystem, a_path) if isinstance(path_or_paths, list): self.paths = [_parse_uri(path) for path in path_or_paths] else: self.paths = _parse_uri(path_or_paths) - self.read_dictionary = read_dictionary - self.memory_map = memory_map - self.buffer_size = buffer_size + self._metadata.read_dictionary = read_dictionary + self._metadata.memory_map = memory_map + self._metadata.buffer_size = buffer_size (self.pieces, self.partitions, self.common_metadata_path, self.metadata_path) = _make_manifest( path_or_paths, self.fs, metadata_nthreads=metadata_nthreads, - open_file_func=partial(_open_dataset_file, self)) + open_file_func=partial(_open_dataset_file, self._metadata) + ) if self.common_metadata_path is not None: with self.fs.open(self.common_metadata_path) as f: - self.common_metadata = read_metadata(f, memory_map=memory_map) + self._metadata.common_metadata = read_metadata( + f, + memory_map=memory_map + ) else: - self.common_metadata = None + self._metadata.common_metadata = None if metadata is None and self.metadata_path is not None: with self.fs.open(self.metadata_path) as f: @@ -1171,6 +1184,16 @@ def all_filters_accept(piece): self.pieces = [p for p in self.pieces if all_filters_accept(p)] + fs = property(operator.attrgetter('_metadata.fs')) + memory_map = property(operator.attrgetter('_metadata.memory_map')) + read_dictionary = property( + operator.attrgetter('_metadata.read_dictionary') + ) + common_metadata = property( + operator.attrgetter('_metadata.common_metadata') + ) + buffer_size = property(operator.attrgetter('_metadata.buffer_size')) + def _make_manifest(path_or_paths, fs, pathsep='/', metadata_nthreads=1, open_file_func=None):