Skip to content

Support OR-ed condition in Delta checkpoint iterator#19439

Merged
findepi merged 2 commits intotrinodb:masterfrom
findinpath:findinpath/delta-checkpoint-entry-or
Oct 23, 2023
Merged

Support OR-ed condition in Delta checkpoint iterator#19439
findepi merged 2 commits intotrinodb:masterfrom
findinpath:findinpath/delta-checkpoint-entry-or

Conversation

@findinpath
Copy link
Copy Markdown
Contributor

@findinpath findinpath commented Oct 18, 2023

Description

Relates to #19156

Very much inspired from #19240

The problem this PR is solving is that it avoids working with TupleDomain.all() as tuple domain when there are more predicates specified for the CheckpointEntryIterator:

TupleDomain<HiveColumnHandle> tupleDomain = columns.size() > 1 ?
TupleDomain.all() :

What this means is that in such cases, all the entries from the Delta Lake Parquet checkpoint file will be returned as results even though the code flow is interested in a small subset of the entries.

A common use-case of specifying more predicates (entry types) for the CheckpointEntryIterator is DeltaLakeMetadata.getTableHandle():

logEntries = transactionLogAccess.getTransactionLogEntries(
session,
tableSnapshot,
ImmutableSet.of(METADATA, PROTOCOL),

Additional context and related issues

The ParquetPageSourceFactory has been slightly refactored to expose a method handling multiple tuple domains.
The results returned in the page source are corresponding to any of the specified tuple domains.

To put things in perspective, the changes from this PR have been tested against a 300MB checkpoint file to see how it performs.
The example showcased below depends on a local file, but the changes have been tested as well against a S3 bucket and for retrieving metadata & protocol together there is no relevant performance gain.

tldr; performance wise the changes coming with this PR are rather modest because the amount of bytes read from the Parquet columnar file for metadata and protocol is pretty small compared to the bytes needed to encode the add entries.

    @Test
    public void testBytesRead()
            throws IOException
    {
        String targetPath = "file:///home/marius/Downloads/00000000000000000003.checkpoint.parquet";

        CheckpointEntryIterator metadataEntryIterator =
                createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(METADATA), Optional.empty(), Optional.empty());
        CheckpointEntryIterator protocolEntryIterator =
                createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(PROTOCOL), Optional.empty(), Optional.empty());
        MetadataEntry metadataEntry = getOnlyElement(metadataEntryIterator).getMetaData();
        ProtocolEntry protocolEntry = getOnlyElement(protocolEntryIterator).getProtocol();

        CheckpointEntryIterator metadataAndProtocolEntryIterator =
                createCheckpointEntryIterator(URI.create(targetPath), ImmutableSet.of(METADATA, PROTOCOL), Optional.empty(), Optional.empty());
        CheckpointEntryIterator addEntryIterator = createCheckpointEntryIterator(
                URI.create(targetPath),
                ImmutableSet.of(ADD),
                Optional.of(metadataEntry),
                Optional.of(protocolEntry));

        System.out.println("METADATA");
        System.out.println("completed positions: " + metadataEntryIterator.getCompletedPositions());
        System.out.println("completed bytes: " + metadataEntryIterator.getCompletedBytes());
        System.out.println("PROTOCOL");
        System.out.println("completed positions: " + protocolEntryIterator.getCompletedPositions());
        System.out.println("completed bytes: " + protocolEntryIterator.getCompletedBytes());
        System.out.println("METADATA&PROTOCOL");
        long startMetadataAndProtocol = System.currentTimeMillis();
        System.out.println("size: " + Iterators.size(metadataAndProtocolEntryIterator));
        long endMetadataAndProtocol = System.currentTimeMillis();
        System.out.println("Elapsed Time in milli seconds: " + (endMetadataAndProtocol - startMetadataAndProtocol));
        System.out.println("completed positions: " + metadataAndProtocolEntryIterator.getCompletedPositions());
        System.out.println("completed bytes: " + metadataAndProtocolEntryIterator.getCompletedBytes());
        System.out.println("ADD");
        long startAdd = System.currentTimeMillis();
        System.out.println("size: " + Iterators.size(addEntryIterator));
        long endAdd = System.currentTimeMillis();
        System.out.println("Elapsed Time in milli seconds: " + (endAdd - startAdd));
        System.out.println("completed positions: " + addEntryIterator.getCompletedPositions());
        System.out.println("completed bytes: " + addEntryIterator.getCompletedBytes());
    }

The results on testing with master

METADATA
completed positions: OptionalLong[470000]
completed bytes: 49876
PROTOCOL
completed positions: OptionalLong[470000]
completed bytes: 49224
METADATA&PROTOCOL
size: 2
Elapsed Time in milli seconds: 261
completed positions: OptionalLong[1235156]
completed bytes: 50782
ADD
size: 1235154
Elapsed Time in milli seconds: 14040
completed positions: OptionalLong[1235156]
completed bytes: 329586188

The results on testing with the changes from this PR:

METADATA
completed positions: OptionalLong[470000]
completed bytes: 49876
PROTOCOL
completed positions: OptionalLong[470000]
completed bytes: 49224
METADATA&PROTOCOL
size: 2
Elapsed Time in milli seconds: 121
completed positions: OptionalLong[470000]
completed bytes: 49948
ADD
size: 1235154
Elapsed Time in milli seconds: 14194
completed positions: OptionalLong[1235156]
completed bytes: 329586188

I slightly modified the example above to test against MinIO and check what kind of conversations are made with the object storage:

        MinioClient minioClient = new MinioClient("http://localhost:9080", "minio-access-key", "minio-secret-key");
        minioClient.captureBucketNotifications(BUCKET_NAME, event -> {
            System.out.println(event.eventType() + " - " + event.requestParameters());
        });

Relevant output while running with this PR

METADATA
s3:ObjectAccessed:Head - {principalId=minio-access-key, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=-49152, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=4-727, region=, sourceIPAddress=172.28.0.1}
completed positions: OptionalLong[470000]
completed bytes: 49876
PROTOCOL
s3:ObjectAccessed:Head - {principalId=minio-access-key, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=-49152, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=728-799, region=, sourceIPAddress=172.28.0.1}
completed positions: OptionalLong[470000]
completed bytes: 49224
METADATA&PROTOCOL
s3:ObjectAccessed:Head - {principalId=minio-access-key, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=-49152, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=4-799, region=, sourceIPAddress=172.28.0.1}
size: 2
Elapsed Time in milli seconds: 126
completed positions: OptionalLong[470000]
completed bytes: 49948

Relevant output while running on master

METADATA
s3:ObjectAccessed:Head - {principalId=minio-access-key, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=-49152, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=4-727, region=, sourceIPAddress=172.28.0.1}
completed positions: OptionalLong[470000]
completed bytes: 49876
PROTOCOL
s3:ObjectAccessed:Head - {principalId=minio-access-key, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=-49152, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=728-799, region=, sourceIPAddress=172.28.0.1}
completed positions: OptionalLong[470000]
completed bytes: 49224
METADATA&PROTOCOL
s3:ObjectAccessed:Head - {principalId=minio-access-key, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=-49152, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=4-799, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=125355536-125355952, region=, sourceIPAddress=172.28.0.1}
s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=250711810-250712226, region=, sourceIPAddress=172.28.0.1}
size: 2
Elapsed Time in milli seconds: 355
completed positions: OptionalLong[1235156]
completed bytes: 50782

For METADATA & PROTOCOL when specified together, for the file I am using for testing, the CheckpointEntryIterator makes currently (in master`) the extra call:

s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=125355536-125355952, region=, sourceIPAddress=172.28.0.1}

Release notes

(x) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

#
* ({issue}`issuenumber`)

@cla-bot cla-bot bot added the cla-signed label Oct 18, 2023
@findinpath findinpath self-assigned this Oct 18, 2023
@github-actions github-actions bot added tests:hive hudi Hudi connector delta-lake Delta Lake connector hive Hive connector labels Oct 18, 2023
Copy link
Copy Markdown
Member

@findepi findepi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Accept FilterPredicate in ParquetReader"

Copy link
Copy Markdown
Contributor

@jkylling jkylling left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for fixing this!

@findepi
Copy link
Copy Markdown
Member

findepi commented Oct 19, 2023

master
METADATA&PROTOCOL
completed bytes: 50782

changes from this PR:
METADATA&PROTOCOL
completed bytes: 49948

in both cases we read ~50k of 300MB file

@findinpath findinpath added no-release-notes This pull request does not require release notes entry and removed release-notes labels Oct 19, 2023
@jkylling
Copy link
Copy Markdown
Contributor

master
METADATA&PROTOCOL
completed bytes: 50782

changes from this PR:
METADATA&PROTOCOL
completed bytes: 49948

in both cases we read ~50k of 300MB file

The number of bytes read should be approximately the same, but the number of completed positions is very different (1235156 vs. 470000). The bottle neck is that we are iterating over all the rows of the row groups we read, and checking if they contain non-null data for the metadata or protocol entries, but this is only the case for two rows. On master, since the columns we select are all null except in two rows, it's cheap in terms of data, but it's expensive in terms of compute (relative to what this operation is supposed to do), as we iterate over 1235156 rows.

@findinpath findinpath force-pushed the findinpath/delta-checkpoint-entry-or branch from b00803c to e134ec5 Compare October 19, 2023 13:07
@findinpath findinpath force-pushed the findinpath/delta-checkpoint-entry-or branch from e134ec5 to 000da7e Compare October 19, 2023 14:55
@jkylling
Copy link
Copy Markdown
Contributor

master
METADATA&PROTOCOL
completed bytes: 50782

changes from this PR:
METADATA&PROTOCOL
completed bytes: 49948

in both cases we read ~50k of 300MB file

The number of bytes read should be approximately the same, but the number of completed positions is very different (1235156 vs. 470000). The bottle neck is that we are iterating over all the rows of the row groups we read, and checking if they contain non-null data for the metadata or protocol entries, but this is only the case for two rows. On master, since the columns we select are all null except in two rows, it's cheap in terms of data, but it's expensive in terms of compute (relative to what this operation is supposed to do), as we iterate over 1235156 rows.

I went back and tested this PR with similar checkpoint part files to the ones reported in #17405 (comment)

When fetching metadata and protocol entries on master: ~950 ms
When fetching metadata and protocol entries on this PR: ~150 ms

This is a bit at odds with the reported speed up from ~40 seconds to ~1 second when retrieving add entries in the issue. I tested again on master with a TupleDomain.all() filter, and it still takes more than 20 seconds to collect add entries. This can be timed to page.getBlock(i).getLoadedBlock() in https://github.com/trinodb/trino/blob/master/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/CheckpointEntryIterator.java#L700
being considerably slower for add entries than for metadata or protocol entries. This is likely because these tables have a very wide schema, which makes getLoadedBlock() take a significant amount of time per add entry, since the parquet statistics for these blocks is very large (a future optimization would be to only read a subset of the statistics we need). Having a wide table schema should be much less common than to have tables with many remove entries in their checkpoints.

Still, the speed up from 950 ms to 150 ms should be enough for doing this PR. This delay is during the analysis phase, so is usually very noticeable when querying.

@findepi
Copy link
Copy Markdown
Member

findepi commented Oct 20, 2023

thanks @jkylling for the feedback and measuring the times

in both cases we read ~50k of 300MB file

spoke with @ebyhr and realized we measured only the amount of data loaded, but not the number of round trips to the storage.
It the small difference in number of bytes read is effect of saving a bunch of small s3 requests, then this is still an important improvement

@findinpath please measure that (eg with MinIO notifications / request log)

As @findinpath spoke earlier, Open question remains: could we achieve same improvement within ParquetReader, based on column select and all-null information for row groups. This would mean same improvement for chckpoint reading without any checkpoint reader changes, and also for other cases.

@findinpath findinpath force-pushed the findinpath/delta-checkpoint-entry-or branch from 000da7e to d03c3fe Compare October 20, 2023 12:05
@findinpath
Copy link
Copy Markdown
Contributor Author

@findinpath please measure that (eg with MinIO notifications / request log)

@findepi updated the description

For METADATA & PROTOCOL when specified together, for the file I am using for testing, the CheckpointEntryIterator makes currently (in master`) the extra call:

s3:ObjectAccessed:Get - {principalId=minio-access-key, range=bytes=125355536-125355952, region=, sourceIPAddress=172.28.0.1}

@findinpath
Copy link
Copy Markdown
Contributor Author

Open question remains: could we achieve same improvement within ParquetReader, based on column select and all-null information for row groups.

We do already have the null/empty check here:

Map<ColumnDescriptor, Long> columnValueCounts = getColumnValueCounts(block, descriptorsByPath);
Optional<List<ColumnDescriptor>> candidateColumns = parquetPredicate.getIndexLookupCandidates(columnValueCounts, columnStatistics, dataSource.getId());
if (candidateColumns.isEmpty()) {
return false;
}

@findinpath
Copy link
Copy Markdown
Contributor Author

Rebasing on master to resolve conflicts on CheckpointEntryIterator

@findinpath findinpath force-pushed the findinpath/delta-checkpoint-entry-or branch from d03c3fe to 8c9986f Compare October 22, 2023 20:01
@findepi
Copy link
Copy Markdown
Member

findepi commented Oct 23, 2023

per offline discussion with @raunaqmorarka
the alternative -- to make ParquetReader skip row groups when stats indicate column is all null -- is considered complicated, so we should probably land this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

cla-signed delta-lake Delta Lake connector hive Hive connector hudi Hudi connector no-release-notes This pull request does not require release notes entry

Development

Successfully merging this pull request may close these issues.

4 participants