Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,25 @@ ParquetFileFragment::ParquetFileFragment(FileSource source,
row_groups_(std::move(row_groups)),
parquet_format_(checked_cast<ParquetFileFormat&>(*format_)),
has_complete_metadata_(RowGroupInfosAreComplete(row_groups_) &&
physical_schema_ != nullptr) {}
physical_schema_ != nullptr) {
if (!row_groups_.empty()) {
// Empty row_groups_ indicates selection of all row groups in the file, so we must
// open a reader to determine the real count.
num_row_groups_ = static_cast<int>(row_groups_.size());
}
}

Result<int> ParquetFileFragment::GetNumRowGroups() {
auto lock = physical_schema_mutex_.Lock();
if (num_row_groups_ == -1) {
ARROW_ASSIGN_OR_RAISE(auto reader, parquet_format_.GetReader(source_));
num_row_groups_ = reader->num_row_groups();
if (row_groups_.empty()) {
row_groups_ = RowGroupInfo::FromCount(num_row_groups_);
}
}
return num_row_groups_;
}

Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* reader) {
if (HasCompleteMetadata()) {
Expand Down Expand Up @@ -541,6 +559,7 @@ Status ParquetFileFragment::EnsureCompleteMetadata(parquet::arrow::FileReader* r
int num_row_groups = metadata->num_row_groups();

if (row_groups_.empty()) {
num_row_groups_ = num_row_groups;
row_groups_ = RowGroupInfo::FromCount(num_row_groups);
}

Expand Down
4 changes: 4 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,9 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
/// represents all RowGroups in the parquet file.
const std::vector<RowGroupInfo>& row_groups() const { return row_groups_; }

/// \brief Return the number of row groups selected by this fragment.
Result<int> GetNumRowGroups();

/// \brief Indicate if the attached statistics are complete and the physical schema
/// is cached.
///
Expand Down Expand Up @@ -244,6 +247,7 @@ class ARROW_DS_EXPORT ParquetFileFragment : public FileFragment {
std::vector<RowGroupInfo> row_groups_;
ParquetFileFormat& parquet_format_;
bool has_complete_metadata_;
int num_row_groups_ = -1;

friend class ParquetFileFormat;
};
Expand Down
8 changes: 8 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -962,6 +962,14 @@ cdef class ParquetFileFragment(FileFragment):
return None
return [RowGroupInfo.wrap(row_group) for row_group in c_row_groups]

@property
def num_row_groups(self):
"""
Return the number of row groups viewed by this fragment (not the
number of row groups in the origin file).
"""
return GetResultValue(self.parquet_file_fragment.GetNumRowGroups())

def split_by_row_group(self, Expression filter=None,
Schema schema=None):
"""
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
cdef cppclass CParquetFileFragment "arrow::dataset::ParquetFileFragment"(
CFileFragment):
const vector[CRowGroupInfo]& row_groups() const
CResult[int] GetNumRowGroups()
CResult[vector[shared_ptr[CFragment]]] SplitByRowGroup(
shared_ptr[CExpression] predicate)
CStatus EnsureCompleteMetadata()
Expand Down
32 changes: 29 additions & 3 deletions python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,14 @@ def test_filesystem_dataset(mockfs):
assert isinstance(fragment.format, ds.ParquetFileFormat)
assert isinstance(fragment, ds.ParquetFileFragment)
assert fragment.row_groups is None
assert fragment.num_row_groups == 1

row_group_fragments = list(fragment.split_by_row_group())
assert len(row_group_fragments) == 1
assert fragment.num_row_groups == len(row_group_fragments) == 1
assert isinstance(row_group_fragments[0], ds.ParquetFileFragment)
assert row_group_fragments[0].path == path
assert row_group_fragments[0].row_groups == [ds.RowGroupInfo(0)]
assert row_group_fragments[0].num_row_groups == 1

fragments = list(dataset.get_fragments(filter=ds.field("const") == 0))
assert len(fragments) == 2
Expand Down Expand Up @@ -599,14 +601,17 @@ def test_make_fragment(multisourcefs):

for path in dataset.files:
fragment = parquet_format.make_fragment(path, multisourcefs)
assert fragment.row_groups is None
assert fragment.num_row_groups == 1

row_group_fragment = parquet_format.make_fragment(path, multisourcefs,
row_groups=[0])
for f in [fragment, row_group_fragment]:
assert isinstance(f, ds.ParquetFileFragment)
assert f.path == path
assert isinstance(f.filesystem, type(multisourcefs))
assert fragment.row_groups is None
assert row_group_fragment.row_groups == [ds.RowGroupInfo(0)]
assert row_group_fragment.num_row_groups == 1


def test_make_csv_fragment_from_buffer():
Expand Down Expand Up @@ -807,13 +812,14 @@ def test_fragments_parquet_row_groups(tempdir):

# list and scan row group fragments
row_group_fragments = list(fragment.split_by_row_group())
assert len(row_group_fragments) == 2
assert len(row_group_fragments) == fragment.num_row_groups == 2
result = row_group_fragments[0].to_table(schema=dataset.schema)
assert result.column_names == ['f1', 'f2', 'part']
assert len(result) == 2
assert result.equals(table.slice(0, 2))

assert row_group_fragments[0].row_groups is not None
assert row_group_fragments[0].num_row_groups == 1
assert row_group_fragments[0].row_groups[0].statistics == {
'f1': {'min': 0, 'max': 1},
'f2': {'min': 1, 'max': 1},
Expand All @@ -826,6 +832,26 @@ def test_fragments_parquet_row_groups(tempdir):
assert len(result) == 1


@pytest.mark.parquet
def test_parquet_fragment_num_row_groups(tempdir):
import pyarrow.parquet as pq

table = pa.table({'a': range(8)})
pq.write_table(table, tempdir / "test.parquet", row_group_size=2)
dataset = ds.dataset(tempdir / "test.parquet", format="parquet")
original_fragment = list(dataset.get_fragments())[0]

# create fragment with subset of row groups
fragment = original_fragment.format.make_fragment(
original_fragment.path, original_fragment.filesystem,
row_groups=[1, 3])
assert fragment.num_row_groups == 2
# ensure that parsing metadata preserves correct number of row groups
fragment.ensure_complete_metadata()
assert fragment.num_row_groups == 2
assert len(fragment.row_groups) == 2


@pytest.mark.pandas
@pytest.mark.parquet
def test_fragments_parquet_row_groups_dictionary(tempdir):
Expand Down