diff --git a/cpp/src/arrow/dataset/file_parquet.cc b/cpp/src/arrow/dataset/file_parquet.cc index e036c4826c8..dd264f511ca 100644 --- a/cpp/src/arrow/dataset/file_parquet.cc +++ b/cpp/src/arrow/dataset/file_parquet.cc @@ -511,7 +511,25 @@ ParquetFileFragment::ParquetFileFragment(FileSource source, row_groups_(std::move(row_groups)), parquet_format_(checked_cast(*format_)), has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) && - physical_schema_ != nullptr) {} + physical_schema_ != nullptr) { + if (!row_groups_.empty()) { + // Empty row_groups_ indicates selection of all row groups in the file, so we must + // open a reader to determine the real count. + num_row_groups_ = static_cast(row_groups_.size()); + } +} + +Result ParquetFileFragment::GetNumRowGroups() { + auto lock = physical_schema_mutex_.Lock(); + if (num_row_groups_ == -1) { + ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_)); + num_row_groups_ = reader->num_row_groups(); + if (row_groups_.empty()) { + row_groups_ = RowGroupInfo::FromCount(num_row_groups_); + } + } + return num_row_groups_; +} Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) { if (HasCompleteMetadata()) { @@ -541,6 +559,7 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r int num_row_groups = metadata->num_row_groups(); if (row_groups_.empty()) { + num_row_groups_ = num_row_groups; row_groups_ = RowGroupInfo::FromCount(num_row_groups); } diff --git a/cpp/src/arrow/dataset/file_parquet.h b/cpp/src/arrow/dataset/file_parquet.h index 939fdc53687..576e370a499 100644 --- a/cpp/src/arrow/dataset/file_parquet.h +++ b/cpp/src/arrow/dataset/file_parquet.h @@ -216,6 +216,9 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { /// represents all RowGroups in the parquet file. const std::vector& row_groups() const { return row_groups_; } + /// \brief Return the number of row groups selected by this fragment. + Result GetNumRowGroups(); + /// \brief Indicate if the attached statistics are complete and the physical schema /// is cached. /// @@ -244,6 +247,7 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment { std::vector row_groups_; ParquetFileFormat& parquet_format_; bool has_complete_metadata_; + int num_row_groups_ = -1; friend class ParquetFileFormat; }; diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 3b8621f9709..c739f85cb91 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -962,6 +962,14 @@ cdef class ParquetFileFragment(FileFragment): return None return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups] + @property + def num_row_groups(self): + """ + Return the number of row groups viewed by this fragment (not the + number of row groups in the origin file). + """ + return GetResultValue(self.parquet_file_fragment.GetNumRowGroups()) + def split_by_row_group(self, Expression filter=None, Schema schema=None): """ diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index 3e7ca1d5752..589551aafd1 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -240,6 +240,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"( CFileFragment): const vector[CRowGroupInfo]& row_groups() const + CResult[int] GetNumRowGroups() CResult[vector[shared_ptr[CFragment]]] SplitByRowGroup( shared_ptr[CExpression] predicate) CStatus EnsureCompleteMetadata() diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index c1aa87d0c47..371f5d8b460 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -247,12 +247,14 @@ def test_filesystem_dataset(mockfs): assert isinstance(fragment.format, ds.ParquetFileFormat) assert isinstance(fragment, ds.ParquetFileFragment) assert fragment.row_groups is None + assert fragment.num_row_groups == 1 row_group_fragments = list(fragment.split_by_row_group()) - assert len(row_group_fragments) == 1 + assert fragment.num_row_groups == len(row_group_fragments) == 1 assert isinstance(row_group_fragments[0], ds.ParquetFileFragment) assert row_group_fragments[0].path == path assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)] + assert row_group_fragments[0].num_row_groups == 1 fragments = list(dataset.get_fragments(filter=ds.field("const") == 0)) assert len(fragments) == 2 @@ -599,14 +601,17 @@ def test_make_fragment(multisourcefs): for path in dataset.files: fragment = parquet_format.make_fragment(path, multisourcefs) + assert fragment.row_groups is None + assert fragment.num_row_groups == 1 + row_group_fragment = parquet_format.make_fragment(path, multisourcefs, row_groups=[0]) for f in [fragment, row_group_fragment]: assert isinstance(f, ds.ParquetFileFragment) assert f.path == path assert isinstance(f.filesystem, type(multisourcefs)) - assert fragment.row_groups is None assert row_group_fragment.row_groups == [ds.RowGroupInfo(0)] + assert row_group_fragment.num_row_groups == 1 def test_make_csv_fragment_from_buffer(): @@ -807,13 +812,14 @@ def test_fragments_parquet_row_groups(tempdir): # list and scan row group fragments row_group_fragments = list(fragment.split_by_row_group()) - assert len(row_group_fragments) == 2 + assert len(row_group_fragments) == fragment.num_row_groups == 2 result = row_group_fragments[0].to_table(schema=dataset.schema) assert result.column_names == ['f1', 'f2', 'part'] assert len(result) == 2 assert result.equals(table.slice(0, 2)) assert row_group_fragments[0].row_groups is not None + assert row_group_fragments[0].num_row_groups == 1 assert row_group_fragments[0].row_groups[0].statistics == { 'f1': {'min': 0, 'max': 1}, 'f2': {'min': 1, 'max': 1}, @@ -826,6 +832,26 @@ def test_fragments_parquet_row_groups(tempdir): assert len(result) == 1 +@pytest.mark.parquet +def test_parquet_fragment_num_row_groups(tempdir): + import pyarrow.parquet as pq + + table = pa.table({'a': range(8)}) + pq.write_table(table, tempdir / "test.parquet", row_group_size=2) + dataset = ds.dataset(tempdir / "test.parquet", format="parquet") + original_fragment = list(dataset.get_fragments())[0] + + # create fragment with subset of row groups + fragment = original_fragment.format.make_fragment( + original_fragment.path, original_fragment.filesystem, + row_groups=[1, 3]) + assert fragment.num_row_groups == 2 + # ensure that parsing metadata preserves correct number of row groups + fragment.ensure_complete_metadata() + assert fragment.num_row_groups == 2 + assert len(fragment.row_groups) == 2 + + @pytest.mark.pandas @pytest.mark.parquet def test_fragments_parquet_row_groups_dictionary(tempdir):