Skip to content

Add Parquet column index filtering to Iceberg#13584

Closed
electrum wants to merge 3 commits intotrinodb:masterfrom
electrum:iceberg-parquet
Closed

Add Parquet column index filtering to Iceberg#13584
electrum wants to merge 3 commits intotrinodb:masterfrom
electrum:iceberg-parquet

Conversation

@electrum
Copy link
Copy Markdown
Member

@electrum electrum commented Aug 10, 2022

Description

Is this change a fix, improvement, new feature, refactoring, or other?

improvement

Related issues, pull requests, and links

Fixes #11000

Documentation

(x) No documentation is needed.

Release notes

(x) Release notes entries required with the following suggested text:

# Iceberg connector
* Improve performance of querying Parquet data for files containing column indexes. ({issue}`13584`)

@ebyhr
Copy link
Copy Markdown
Member

ebyhr commented Aug 10, 2022

There's an existing PR #12977. I don't mind closing my PR, but probably we need to find way to add tests in this PR.

@raunaqmorarka
Copy link
Copy Markdown
Member

The native parquet writer doesn't produce page indexes, and that's the only writer in iceberg connector, so we won't benefit from this unless another engine writes the page indexes.
This PR is missing tests with page indexes in iceberg connector. Maybe we can produce a file with page indexes offline and use that for testing ?
Are we impacted by any of the problems mentioned in apache/iceberg#193 ?
Note that we rely on parquet filter APIs in our parquet reader to get the row ranges to be read from column index (ColumnIndexFilter.calculateRowRanges).

ImmutableList.Builder<Optional<ColumnIndexStore>> columnIndexes = ImmutableList.builder();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
Optional<ColumnIndexStore> columnIndex = getColumnIndexStore(dataSource, block, descriptorsByPath, parquetTupleDomain, options);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although we won't read the column index from file until later, it would be better to avoid creating columnIndex until it's needed (after the start <= firstDataPage && firstDataPage < start + length). We can make same change in ParquetPageSourceFactory as well.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will change the indentation of the block and make the diff harder to read, and is unrelated to the Iceberg change, so let's do that as a follow up.

@ebyhr
Copy link
Copy Markdown
Member

ebyhr commented Aug 10, 2022

I updated 3a5a15a that includes a generated Parquet file. Please feel free to pick up the commit.

@osscm
Copy link
Copy Markdown
Contributor

osscm commented Sep 28, 2022

@electrum thanks for helping to add this feature!
Wondering, if we are we planning to add this in the next release, thanks.

@osscm
Copy link
Copy Markdown
Contributor

osscm commented Oct 13, 2022

I updated 3a5a15a that includes a generated Parquet file. Please feel free to pick up the commit.

thanks @ebyhr!

Looks like you have a test case with the generated file as well: https://github.com/trinodb/trino/blob/3a5a15a53ba5e639287d073f661945672f5f6bc3/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergParquetPageSkipping.java

Are there any other test that needs to be added? I'll be happy to work.

@osscm
Copy link
Copy Markdown
Contributor

osscm commented Oct 14, 2022

I'm trying to backport to 391 and test it.
I tried to run the tests
few of them were failing, there might be some incompatibility with the 391 Parquet reader/writer.

image

@raunaqmorarka
Copy link
Copy Markdown
Member

I'm trying to backport to 391 and test it. I tried to run the tests few of them were failing, there might be some incompatibility with the 391 Parquet reader/writer.

image

Those tests fail because the native trino parquet writer used by iceberg connector does not write page indexes to file yet.
The main blocker here is a resolution to apache/iceberg#193
In practise I have not found page indexes to improve performance much because page level min/max indexes are not very selective unless the data is sorted. If the data is sorted, then row group pruning already provides most of the benefit.
When multiple columns are read with a selective predicate on one of columns, then the lazy loading of blocks in the engine already allows orc/parquet readers to skip decoding for filtered rows of the remaining columns after a selective predicate is applied to a column.
Though page indexes may eliminate reads from S3 for a subset of the parquet pages in a column chunk, in practise reads from nearby positions in a file are often coalesced by the orc/parquet reader to avoid making multiple small reads from S3, thereby often limiting the benefit to just saving decompression of eliminated pages.
Reading page indexes also incurs additional S3 requests because they are not part of the footer, the footer has just references to them. So on the whole, page indexes does not significantly improve perf even though it sounds like a valuable feature in theory.

@osscm
Copy link
Copy Markdown
Contributor

osscm commented Oct 15, 2022

I'm trying to backport to 391 and test it. I tried to run the tests few of them were failing, there might be some incompatibility with the 391 Parquet reader/writer.
image

Those tests fail because the native trino parquet writer used by iceberg connector does not write page indexes to file yet. The main blocker here is a resolution to apache/iceberg#193 In practise I have not found page indexes to improve performance much because page level min/max indexes are not very selective unless the data is sorted. If the data is sorted, then row group pruning already provides most of the benefit. When multiple columns are read with a selective predicate on one of columns, then the lazy loading of blocks in the engine already allows orc/parquet readers to skip decoding for filtered rows of the remaining columns after a selective predicate is applied to a column. Though page indexes may eliminate reads from S3 for a subset of the parquet pages in a column chunk, in practise reads from nearby positions in a file are often coalesced by the orc/parquet reader to avoid making multiple small reads from S3, thereby often limiting the benefit to just saving decompression of eliminated pages. Reading page indexes also incurs additional S3 requests because they are not part of the footer, the footer has just references to them. So on the whole, page indexes does not significantly improve perf even though it sounds like a valuable feature in theory.

Thanks, @raunaqmorarka for the detailed response!
In that case, as Bloom Filter is being supported by Iceberg+Spark for the Parquet file format, it will be worth supporting Trino reads to have that support as well. As Bloom filter can provide a better performance (based on the cases), especially for high cardinality columns.

@raunaqmorarka
Copy link
Copy Markdown
Member

Thanks, @raunaqmorarka for the detailed response! In that case, as Bloom Filter is being supported by Iceberg+Spark for the Parquet file format, it will be worth supporting Trino reads to have that support as well. As Bloom filter can provide a better performance (based on the cases), especially for high cardinality columns.

There is an open PR about that #14428

@mosabua
Copy link
Copy Markdown
Member

mosabua commented Jan 12, 2024

@electrum @raunaqmorarka @ebyhr @osscm .. is this still in progress or replaced by some other work?

@mwong77
Copy link
Copy Markdown
Contributor

mwong77 commented May 20, 2024

After cherry-picking the commits in this PR, I want to bring up an issue that was discovered. I performed the following steps:

  1. Create an iceberg table partitioned by a single column and insert some initial data rows.
  2. Add a new partition to the iceberg table (schema evolution) and insert some more data rows.
  3. Run a simple SELECT query using the new partition column in a filter predicate.

I observed that no rows were returned by the query ran in step 3. Digging a bit more into the Trino code, I can see the following:

In the IcebergPageSourceProvider class, the filter predicate with the new partition column gets treated as an unenforced predicate even though the iceberg table defines the column (that’s used in the filter) as a partition. Ideally, the partitioning column should be an enforced predicate (from my current understanding). The reason why this partition column is not treated as an enforced predicate is because the canEnforceColumnConstraintInSpecs function will only return true and add it to the enforced predicates list if all iceberg partitioning specs contain the new partitioning column (in the filter predicate). In my table, I have two partitioning specs due to evolving the iceberg schema and the partition column in my filter predicate was only added to the second iceberg partitioning spec. Now Trino will construct the column index object based on the columns in the unenforced predicate(effectivePredicate was used to construct the parquetTupleDomain which is used to construct the columnIndex).

After the column index object is constructed and Trino uses it to match the different blocks in the parquet file, Trino will proceed to configure the fields of the ParquetReader here. As I look closer, the partition column will not be added to the parquetColumnFieldsBuilder because the partition if-condition here is executed first which doesn’t add the partition column to the parquetColumnFieldsBuilder field which is solely done here. As a result, when the ParquetReader class filters row ranges in the Parquet blocks here, the function will return no rows because of the following code in the ColumnIndexFilter class which is called here:

// In ColumnIndexFilter class of hive parquet jar
public static RowRanges calculateRowRanges(FilterCompat.Filter filter, final ColumnIndexStore columnIndexStore, final Set<ColumnPath> paths, final long rowCount) {
    return (RowRanges)filter.accept(new FilterCompat.Visitor<RowRanges>() {
      public RowRanges visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
        try {
          return (RowRanges)filterPredicateCompat.getFilterPredicate().accept(new ColumnIndexFilter(columnIndexStore, paths, rowCount));
        } catch (ColumnIndexStore.MissingOffsetIndexException var3) {
          ColumnIndexStore.MissingOffsetIndexException e = var3;
          ColumnIndexFilter.LOGGER.info(e.getMessage());
          return RowRanges.createSingle(rowCount);
        }
      }

      public RowRanges visit(FilterCompat.UnboundRecordFilterCompat unboundRecordFilterCompat) {
        return RowRanges.createSingle(rowCount);
      }

      public RowRanges visit(FilterCompat.NoOpFilter noOpFilter) {
        return RowRanges.createSingle(rowCount);
      }
    });
  }

public <T extends Comparable<T>, U extends UserDefinedPredicate<T>> RowRanges visit(Operators.UserDefined<T, U> udp) {  
  return this.applyPredicate(udp.getColumn(), (ci) -> {  
    return (PrimitiveIterator.OfInt)ci.visit(udp);  
  }, udp.getUserDefinedPredicate().acceptsNullValue() ? this.allRows() : RowRanges.EMPTY);  
}


private RowRanges applyPredicate(Operators.Column<?> column, Function<ColumnIndex, PrimitiveIterator.OfInt> func, RowRanges rangesForMissingColumns) {  
  ColumnPath columnPath = column.getColumnPath();  
  if (!this.columns.contains(columnPath)) {  
    return rangesForMissingColumns;  
  } else {  
    OffsetIndex oi = this.columnIndexStore.getOffsetIndex(columnPath);  
    ColumnIndex ci = this.columnIndexStore.getColumnIndex(columnPath);  
    if (ci == null) {  
      LOGGER.info("No column index for column {} is available; Unable to filter on this column", columnPath);  
      return this.allRows();  
    } else {  
      return RowRanges.create(this.rowCount, (PrimitiveIterator.OfInt)func.apply(ci), oi);  
    }  
  }  
}

In the above code, udp contains the user defined predicate (filter predicate using the partition column) and the this.columns contains the list of columns added to parquetColumnFieldsBuilder in the IcebergPageSourceProvider class. Since the parquetColumnFieldsBuilder does not contain the partition column from the filter predicate, return rangesForMissingColumns; is executed and since my user defined predicate does not accept null value, RowRanges.EMPTY is returned. Thus, zero rows are returned when a new partition column is added to an iceberg table and then a query is run that uses that new partition column in a filter predicate.

@github-actions
Copy link
Copy Markdown

github-actions bot commented Sep 4, 2024

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Sep 4, 2024
@mosabua
Copy link
Copy Markdown
Member

mosabua commented Sep 18, 2024

@electrum is this something you are still pursuing?

@mosabua
Copy link
Copy Markdown
Member

mosabua commented Sep 18, 2024

@cwsteinbach @alexjo2144 @findinpath ... I checked with @electrum and it would be good if someone from the team could pick this up.

@github-actions github-actions bot removed the stale label Sep 19, 2024
@github-actions
Copy link
Copy Markdown

This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua

@github-actions github-actions bot added the stale label Oct 11, 2024
@electrum electrum closed this Oct 11, 2024
@electrum electrum deleted the iceberg-parquet branch October 11, 2024 18:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Skip reading Parquet pages using Column Indexes for Iceberg

8 participants