Add support to be able to query new Delta protocols#22596
Add support to be able to query new Delta protocols#22596tdcmeehan merged 2 commits intoprestodb:masterfrom
Conversation
ee8caf0 to
da3c3ae
Compare
|
PR set to draft while waiting for the delta kernel api to release v3.2.0. https://github.com/delta-io/delta/milestone/26
|
|
Suggest adding the PR # to the items in the release note entry, for example: |
222fcb3 to
2118c6e
Compare
|
After the release of Delta Kernel 3.2.0, all limitations are resolved. Also, it seems that there is an improvement in performance reading both delta v1 and delta v3 (perfomance local tests attached in the PR description). PR is ready for review. |
tdcmeehan
left a comment
There was a problem hiding this comment.
Just some initial minor comments, great work!
presto-delta/pom.xml
Outdated
| <scope>test</scope> | ||
| </dependency> | ||
| <dependency> | ||
| <groupId>org.jetbrains</groupId> |
There was a problem hiding this comment.
What is this dependency used for?
There was a problem hiding this comment.
It was used to annotate sql queries in testing. I removed the annotations and the dependency.
| Optional<Engine> deltaEngine = loadDeltaTableClient(session, location, | ||
| schemaTableName); |
There was a problem hiding this comment.
| Optional<Engine> deltaEngine = loadDeltaTableClient(session, location, | |
| schemaTableName); | |
| Optional<Engine> deltaEngine = loadDeltaTableClient(session, location, schemaTableName); |
| try { | ||
| CloseableIterator<FilteredColumnarBatch> columnBatches = snapshot.getScanBuilder(deltaEngine) | ||
| .build().getScanFiles(deltaEngine); | ||
| Row row = null; | ||
| while (columnBatches.hasNext()) { | ||
| CloseableIterator<Row> rows = columnBatches.next().getRows(); | ||
| if (rows.hasNext()) { | ||
| row = rows.next(); | ||
| break; | ||
| } | ||
| } | ||
| Map<String, String> partitionValues = row != null ? | ||
| InternalScanFileUtils.getPartitionValues(row) : new HashMap<>(0); | ||
| columnBatches.close(); |
There was a problem hiding this comment.
| try { | |
| CloseableIterator<FilteredColumnarBatch> columnBatches = snapshot.getScanBuilder(deltaEngine) | |
| .build().getScanFiles(deltaEngine); | |
| Row row = null; | |
| while (columnBatches.hasNext()) { | |
| CloseableIterator<Row> rows = columnBatches.next().getRows(); | |
| if (rows.hasNext()) { | |
| row = rows.next(); | |
| break; | |
| } | |
| } | |
| Map<String, String> partitionValues = row != null ? | |
| InternalScanFileUtils.getPartitionValues(row) : new HashMap<>(0); | |
| columnBatches.close(); | |
| try (CloseableIterator<FilteredColumnarBatch> columnBatches = snapshot.getScanBuilder(deltaEngine).build().getScanFiles(deltaEngine)) { | |
| Row row = null; | |
| while (columnBatches.hasNext()) { | |
| CloseableIterator<Row> rows = columnBatches.next().getRows(); | |
| if (rows.hasNext()) { | |
| row = rows.next(); | |
| break; | |
| } | |
| } | |
| Map<String, String> partitionValues = row != null ? | |
| InternalScanFileUtils.getPartitionValues(row) : new HashMap<>(0); | |
| columnBatches.close(); |
| List<DeltaColumnHandle> partitionColumns = columnDomains.map(domains -> domains.stream() | ||
| .filter(entry -> entry.getColumn().getColumnType() == PARTITION) | ||
| .map(TupleDomain.ColumnDomain::getColumn) | ||
| .collect(Collectors.toList())).orElse(Collections.emptyList()); |
There was a problem hiding this comment.
| .collect(Collectors.toList())).orElse(Collections.emptyList()); | |
| .collect(toImmutableList())).orElse(ImmutableList.of()); |
| row = nextFile.getRows(); | ||
| } | ||
| else { | ||
| logger.debug("There are still rows remaining int the iterator, not advancing the iterator"); |
steveburnett
left a comment
There was a problem hiding this comment.
A single nit of punctuation, otherwise LGTM (docs). Thanks!
| @@ -41,6 +41,8 @@ Property Name Description | |||
| In order for this option to work, also set | |||
| ``experimental.pushdown-dereference-enabled`` to | |||
| ``true``. | |||
| ``delta.case-sensitive-partitions-enabled`` Allows matching the names of partitioned columns in a ``true`` | |||
| case-sensitive manner | |||
There was a problem hiding this comment.
| case-sensitive manner | |
| case-sensitive manner. |
agrawalreetika
left a comment
There was a problem hiding this comment.
Thanks for the PR. Had few tables created with new Protocol earlier but wasn't accessible before this new Protocol changes support. Now able to access those tables.
Dropped in few minor comments, please check!
You mentioned about performance improvement, Just curious which part of the new changes are contributing to performance improvement while reading the table?
| format("Could not move to latest snapshot on table '%s.%s'", schemaTableName.getSchemaName(), | ||
| schemaTableName.getTableName())); | ||
| } | ||
| } |
There was a problem hiding this comment.
For better readability, should we move this code block into a method something like getSnapshot instead?
| */ | ||
| public CloseableIterator<AddFile> listFiles(ConnectorSession session, DeltaTable deltaTable) | ||
| public CloseableIterator<FilteredColumnarBatch> listFiles(ConnectorSession session, DeltaTable deltaTable) | ||
| { |
There was a problem hiding this comment.
We can do null check for deltaTable here -
requireNonNull(deltaTable, "deltaTable is null");
Also I think, we can shift deltaTable.getSnapshotId().isPresent() check in here in the start itself from line 148.
| row.close(); | ||
| } | ||
| catch (IOException e) { | ||
| throw new GenericInternalException("Cloud not close row batch", e); |
There was a problem hiding this comment.
| throw new GenericInternalException("Cloud not close row batch", e); | |
| throw new GenericInternalException("Could not close row batch", e); |
| * @param hiveTableName Name of the Hive table that the Delta table is to be registered as in HMS | ||
| */ | ||
| private static void registerDeltaTableInHMS(QueryRunner queryRunner, String deltaTableName, String hiveTableName) | ||
| protected static void registerDeltaTableInHMS(QueryRunner queryRunner, String deltaTableName, String hiveTableName) |
There was a problem hiding this comment.
Don't see usage of this outside this class. Any reason for changing this from private?
There was a problem hiding this comment.
I intended in use it in other test class but at the end I didn't and I forgot to change the method visibility back. Reverted to private.
2118c6e to
cf45e29
Compare
|
@agrawalreetika performance improvement is due to the new Kernel API. |
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, new local docs build, looks good. Thanks!
|
@mblanco-denodo Could you please squash the commits? |
cf45e29 to
8b33456
Compare
|
@agrawalreetika commits squashed into two as requested:
|
| Optional<Engine> deltaEngine = loadDeltaTableClient(session, | ||
| new Path(deltaTable.getTableLocation()), | ||
| new SchemaTableName(deltaTable.getSchemaName(), deltaTable.getTableName())); | ||
| if (!deltaEngine.isPresent()) { |
There was a problem hiding this comment.
Do you know under what circumstances this error could happen?
There was a problem hiding this comment.
Right now it should not happen, as the method loadDeltaTableClient will return a non null Engine or throw an exception. The usage of Optional in that method is inherited from the previous code, and it is a good practice to check if deltaEngine.isPresent() as in future iterations the logic of loadDeltaTableClient could change (note that all the Delta Kernel API is annotated with the @Evolving annotation, and they are making big API changes even on minor versions).
| return this.caseSensitivePartitionsEnabled; | ||
| } | ||
|
|
||
| @Config("delta.case-sensitive-partitions-enabled") |
There was a problem hiding this comment.
To get a little clarity, was the issue of not being able to query mixed-case/upper-case columns only happening if the column is a partitioned column? Can we also have regular columns as mixed case or upper case, and if yes, does it work as expected?
There was a problem hiding this comment.
We've experienced the problem only on partitioned columns, as the non partitioned columns are in parquet data and reading parquet the case issue does not happen. However the fix should work on both cases.
There was a problem hiding this comment.
The reason I wanted to get an idea is we have removed the toLowerCase conversion for all columns not just the partitioned columns. That's why wanted to make sure it does not have any side effects for regular columns and if it's required even for regular columns, should we rename the config to delta.case-sensitive-columns-enabled?
There was a problem hiding this comment.
I can add a test on regular columns with and without the property to be sure
There was a problem hiding this comment.
I confirm that with delta.case-sensitive-partitions-enabled=false uppercase non partitioned columns are properly returning the data, so I think we should leave the property name as it is because it only changes the behavior to partitioned columns. I added the new test that compares the output between a table with a non partitioned column in lowercase and other table with the same column in uppercase to assure that there is no side effect and both tables return the same data.
There was a problem hiding this comment.
Thanks for adding the tests and confirming.
8b33456 to
0142f1e
Compare
steveburnett
left a comment
There was a problem hiding this comment.
LGTM! (docs)
Pull updated branch, local doc build, looks good. Thanks!
imjalpreet
left a comment
There was a problem hiding this comment.
LGTM 👍🏼
@mblanco-denodo Can you please rebase on master to fix the failing test suite?
0142f1e to
1608031
Compare
| snapshot = deltaTable.getLatestSnapshot(deltaEngine); // get the latest snapshot | ||
| } | ||
| catch (TableNotFoundException e) { | ||
| throw new PrestoException(GENERIC_INTERNAL_ERROR, |
There was a problem hiding this comment.
embed the original exception as the cause here
There was a problem hiding this comment.
Also this should be a user error. Can you check in StandardErrorCodes and DeltaErrorCode to check to see if there's an appropriate user exception for this? If not, please add one.
There was a problem hiding this comment.
Done. Added new DeltaErrorCodes to candle these cases
| String format = ((SnapshotImpl) snapshot).getMetadata().getFormat().getProvider(); | ||
| if (!PARQUET.name().equalsIgnoreCase(format)) { | ||
| throw new PrestoException(DELTA_UNSUPPORTED_DATA_FORMAT, | ||
| format("Delta table %s has unsupported data format: %s. Currently only Parquet data format is supported", schemaTableName, format)); |
There was a problem hiding this comment.
Currently only --> Only the
| catch (TableNotFoundException e) { | ||
| throw new PrestoException(NOT_FOUND, | ||
| format("Delta table (%s.%s) no longer exists.", deltaTable.getSchemaName(), deltaTable.getTableName())); | ||
| format("Delta table not found in '%s'", deltaTable.getTableLocation())); |
| Map<String, Domain> expectedConstraint, | ||
| Map<String, Domain> expectedEnforcedConstraint) | ||
| { | ||
| System.out.println(tableScanWithConstraints(tableName, expectedConstraint, expectedEnforcedConstraint).toString()); |
| new Path(deltaTable.getTableLocation()), | ||
| new SchemaTableName(deltaTable.getSchemaName(), deltaTable.getTableName())); | ||
| if (!deltaEngine.isPresent()) { | ||
| throw new PrestoException(GENERIC_INTERNAL_ERROR, |
There was a problem hiding this comment.
Please throw a system error here. GENERIC_INTERNAL_ERROR is an error for anything that is not categorized, so we shouldn't be intentionally throwing it.
| format(TABLE_NOT_FOUND_ERROR_TEMPLATE, tableName.getSchemaName(), tableName.getTableName())); | ||
| } | ||
| catch (IOException e) { | ||
| throw new GenericInternalException("Could not close columnar batch row", e); |
There was a problem hiding this comment.
Please consider rethrowing an UncheckedIOException
70601f6
1608031 to
70601f6
Compare
|
@mblanco-denodo The directory structure for the tests are pretty long and its breaking our build due to internal limit on path length (<490). Can we have a fix for this? cc : @tdcmeehan |
|
This refactoring only made the directory structure slightly larger, by introducing the |

Description
Use of delta-kernel-api instead of delta-standalone-api in order to be able to query delta tables in newer protocols (delta reader 3, delta writer 7)
Added new configurable parameter (
delta.case-sensitive-partitions-enabled) to allow case sensitive partition matching, allowing to query tables with partition names in uppercase. By default it is set totrue.Motivation and Context
Solves #22543 and #21828.
Impact
Based on some query executions over some data in S3 (times in seconds):
Also experienced huge improvement on some queries during testing. Here are the HTML reports of the tests suites in master and in the branch:
master_vs_branch_test_execution_performance.zip
Test Plan
Test refactor so that the current tests are all executed on older and newer versions of the same test data.
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.
== RELEASE NOTES ==
Delta Connector Changes
22596delta.case-sensitive-partitions-enabledto be able to query data with partitioned columns with column names in uppercase. This property is set totrueby default. :pr:22596