Skip to content

Iceberg: Support applying equality deletes as a join#21605

Merged
tdcmeehan merged 3 commits intoprestodb:masterfrom
jasonf20:equality-deletes-as-join-pr
Jan 24, 2024
Merged

Iceberg: Support applying equality deletes as a join#21605
tdcmeehan merged 3 commits intoprestodb:masterfrom
jasonf20:equality-deletes-as-join-pr

Conversation

@jasonf20
Copy link
Collaborator

@jasonf20 jasonf20 commented Dec 27, 2023

Description

This PR adds support for applying equality deletes as a join instead of while reading the split data.

Since equality deletes often apply to a lot of files, the current
implementation ends up opening the delete files #splits * #delete_files
times which can be very large for cdc/upsert use cases.

This commit implements equality deletes as a join. A connector optimizer
is added to apply the appropriate join(s).

Motivation and Context

Currently, the use case of frequent updates with MoR + Eventual compaction doesn't work because querying tables with more than a couple of delete files is too slow. This enables new use cases to be done using Iceberg in Presto such as CDC.

Example

This example queries a table on a partition with ~5GB of data files + ~150 delete files. Cpu time of 39 seconds vs 5:30 hours
image
image

For each additional delete file the current implementation will need to load #splits more files. While the new method will only load 1 more file. So the difference will keep growing with more files.

Query Plan

- Output[PlanNodeId 9][_col0] => [count:bigint]
        _col0 := count (1:16)
    - Aggregate(FINAL)[PlanNodeId 4] => [count:bigint]
            count := "presto.default.count"((count_9)) (1:16)
        - LocalExchange[PlanNodeId 393][SINGLE] () => [count_9:bigint]
            - RemoteStreamingExchange[PlanNodeId 399][GATHER] => [count_9:bigint]
                - Aggregate(PARTIAL)[PlanNodeId 397] => [count_9:bigint]
                        count_9 := "presto.default.count"((build_tag)) (1:16)
                    - FilterProject[PlanNodeId 326,2][filterPredicate = IS_NULL($data_sequence_number_6), projectLocality = LOCAL] => [build_tag:varchar]
                            Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 3,825,758.00, network: 3,825,758.00}/{source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 3,825,758.00, network: 3,825,758.00}
                        - LeftJoin[PlanNodeId 225][("file_name_4" = "file_name_7") AND ("partition_date" = "partition_date_8") AND ($data_sequence_number_5) < ($data_sequence_number_6)][$hashvalue, $hashvalue_10] => [build_tag:varchar, $data_sequence_number_6:bigint]
                                Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 3,825,758.00, network: 3,825,758.00}
                                Distribution: REPLICATED
                                SortExpression[$data_sequence_number_6]
                            - ScanProject[PlanNodeId 0,444][table = TableHandle {connectorId='iceberg', connectorHandle='files_upsert$data_without_equality_deletes@Optional[6805049505336407491]', layout='Optional[files_upsert$data_without_equality_deletes@Optional[6805049505336407491]]'}, projectLocality = LOCAL] => [partition_date:date, build_tag:varchar, $data_sequence_number_5:bigint, file_name_4:varchar, $hashvalue:bigint]
                                    Estimates: {source: CostBasedSourceInfo, rows: 140,725,454 (1.18GB), cpu: 3,907,632,853.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 140,725,454 (1.18GB), cpu: 9,081,794,792.00, memory: 0.00, network: 0.00}
                                    $hashvalue := combine_hash(combine_hash(BIGINT'0', COALESCE($operator$hash_code(file_name_4), BIGINT'0')), COALESCE($operator$hash_code(partition_date), BIGINT'0')) (1:38)
                                    partition_date := 3:partition_date:date (1:38)
                                    $data_sequence_number_5 := 2147482646:$data_sequence_number:bigint
                                    build_tag := 6:build_tag:varchar (1:38)
                                    file_name_4 := 2:file_name:varchar
                            - LocalExchange[PlanNodeId 373][HASH][$hashvalue_10] (file_name_7, partition_date_8) => [file_name_7:varchar, partition_date_8:date, $data_sequence_number_6:bigint, $hashvalue_10:bigint]
                                    Estimates: {source: CostBasedSourceInfo, rows: 122,904 (1.05MB), cpu: 10,371,138.00, memory: 0.00, network: 3,825,758.00}
                                - RemoteStreamingExchange[PlanNodeId 311][REPLICATE] => [file_name_7:varchar, partition_date_8:date, $data_sequence_number_6:bigint, $hashvalue_11:bigint]
                                        Estimates: {source: CostBasedSourceInfo, rows: 122,904 (1.05MB), cpu: 6,545,380.00, memory: 0.00, network: 3,825,758.00}
                                    - ScanProject[PlanNodeId 224,445][table = TableHandle {connectorId='iceberg', connectorHandle='files_upsert$equality_deletes@Optional[6805049505336407491]', layout='Optional[files_upsert$equality_deletes@Optional[6805049505336407491]]'}, projectLocality = LOCAL] => [file_name_7:varchar, partition_date_8:date, $data_sequence_number_6:bigint, $hashvalue_12:bigint]
                                            Estimates: {source: CostBasedSourceInfo, rows: 122,904 (1.05MB), cpu: 2,719,622.00, memory: 0.00, network: 0.00}/{source: CostBasedSourceInfo, rows: 122,904 (1.05MB), cpu: 6,545,380.00, memory: 0.00, network: 0.00}
                                            $hashvalue_12 := combine_hash(combine_hash(BIGINT'0', COALESCE($operator$hash_code(file_name_7), BIGINT'0')), COALESCE($operator$hash_code(partition_date_8), BIGINT'0'))
                                            file_name_7 := 2:file_name:varchar
                                            $data_sequence_number_6 := 2147482646:$data_sequence_number:bigint
                                            partition_date_8 := 3:partition_date:date

Impact

New parameter to configure if this is enabled.

Test Plan

  • Wrote new tests that cover edge cases with this implementation
  • Changed all existing equality delete tests to also test with this feature enabled
  • Tested manually with tables that I had that didn't work with Presto previously (see above)

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

== RELEASE NOTES ==


Iceberg Changes
* Add support for querying ``"$path"`` which returns the file path containing the row
* Add support for querying ``"$data_sequence_number"`` which returns the Iceberg data sequence number of the file containing the row 
* Add support for applying equality deletes as a join. This can be enabled with the session property ``iceberg.delete_as_join_pushdown_enabled``.

SPI Changes
* Add support for connectors to return joins  in ``ConnectorPlanOptimizer.optimize``

@jasonf20 jasonf20 requested review from a team and shrinidhijoshi as code owners December 27, 2023 09:44
@jasonf20 jasonf20 requested a review from presto-oss December 27, 2023 09:44
@linux-foundation-easycla
Copy link

linux-foundation-easycla bot commented Dec 27, 2023

CLA Signed

The committers listed above are authorized under a signed CLA.

@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch from b27553d to 2bc5098 Compare December 27, 2023 11:50
@jasonf20 jasonf20 changed the title Equality deletes as join pr Iceberg: Support applying equality deletes as a join Dec 27, 2023
@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch from 2bc5098 to c85266a Compare December 31, 2023 11:16
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just leaving some generic comments on the plan optimizer, please generalize the feedback where possible. I will need more time to review.

Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg: Support metadata columns

Can we update the documentation as well noting these new columns (iceberg.rst)?

@tdcmeehan tdcmeehan self-assigned this Jan 4, 2024
Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allow connector optimizers to create joins

I think this approach is OK. I would move out Type and EquiJoinClause to top level classes because they're shared between the ConnectorJoinNode and the original JoinNode, and the inner class relationship these classes have to ConnectorJoinNode may imply that they're somehow specific to the connector plan optimizer somehow.

Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg: Support applying equality deletes as a join

Copy link
Contributor

@tdcmeehan tdcmeehan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iceberg: Support applying equality deletes as a join

I think broadcast join is a global property of the query. Is it expected that users will manually specify that they enable broadcast join?

@jasonf20
Copy link
Collaborator Author

jasonf20 commented Jan 7, 2024

Iceberg: Support applying equality deletes as a join

I think broadcast join is a global property of the query. Is it expected that users will manually specify that they enable broadcast join?

From what I understand the default is AUTOMATIC and that allows broadcast. Is that not the case? I was getting broadcast joins when running explain plan.

I think I corrected all other comments.

@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch from 6d98edd to 1c8e8e8 Compare January 7, 2024 09:41
@github-actions
Copy link

github-actions bot commented Jan 7, 2024

Codenotify: Notifying subscribers in CODENOTIFY files for diff 0907cd8...2df0c25.

Notify File(s)
@steveburnett presto-docs/src/main/sphinx/connector/iceberg.rst

@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch from 1c8e8e8 to b3821ae Compare January 7, 2024 09:46
Copy link
Contributor

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasonf20 Hi Jason, will you be able to post the plan after this change in the PR message? Thanks!

Copy link
Member

@hantangwangd hantangwangd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me. Some little problems to figure out.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use something like java.util.Collections.emptyIterator, it seems to lead in some dependency problems.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad import by mistake, thanks for catching this.

Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the documentation! Two small nits only.

@jasonf20 jasonf20 requested a review from steveburnett January 8, 2024 17:17
@jasonf20
Copy link
Collaborator Author

jasonf20 commented Jan 8, 2024

@yingsu00 added

@jasonf20 jasonf20 requested a review from tdcmeehan January 8, 2024 19:01
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the quick reply and fixes! I found one more nit when I did a new local build of the docs for you to consider.

@jasonf20 jasonf20 requested a review from steveburnett January 8, 2024 21:28
Copy link
Contributor

@steveburnett steveburnett left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! (docs)

Pulled the branch again, new local build of the docs, everything looks good. Thanks!

@yingsu00 yingsu00 requested review from aaneja and rschlussel January 22, 2024 08:31
Copy link
Contributor

@yingsu00 yingsu00 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jasonf20 Thanks for uploading the query plan. Could you please add another actual plan for multiple equality delete files with different equality ids? You can use "explain analyze" to get an actual plan.

Also, I wonder if you could expand the tests to test its interplay with "iceberg.pushdown_filter_enabled" with ORC files (Java Parquet doesn't support filter pushdown). It'll be great if you can add an actual plan for that case too.

@yingsu00
Copy link
Contributor

@jasonf20 Will you be able to do a small comparative TPCDS run with this equality join turned on/off and with/without equality delete files to make sure there is no regression? Without any equality delete files in the data, the before/after plans should be the same and performance should also be same. With some equality delete files, the new plans have more joins and may affect the join reordering later on. If we can show there are no regressions I think the PR is good to go. Thank you!

@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch 3 times, most recently from 75778fb to f1f3c4f Compare January 23, 2024 11:11
@jasonf20
Copy link
Collaborator Author

Hi @yingsu00

Thanks for the comments, Adding the explain plan to the end of this comment.

Regarding iceberg.pushdown_filter_enabled. Based on your comment, I changed the equality delete scan node to pass TupleDomain.all() so this won't affect the generated scan nodes and is now unrelated to the filtering that this may perform on the original scan node. In theory a subset of the filter could be applied to the equality delete files, but since these are typically quite small this optimization is negligible. I think if we want add this it can be done in a seperate PR and it doesn't introduce a regression since the existing implementation doesn't filter them either. There are a few issues with adding a spec to validate this:

  1. "pushdown_filter_enabled" only works in native mode and the current spec is in Java.
  2. Equality delete files are always written as Parquet currently and the code that writes ORC equality delete files is not available due to missing packages and adding the package is non-trivial since there are version conflicts with other ORC packages. Given that the delete scan node passes TupleDomain.all() it feels safe to assume that the parameter won't interact with these changes without explicitly adding a spec for that.

Regarding TPCDS testing, there is currently no API to create equality delete files in Presto so there isn't a way to run this test with TPCDS. The rewriter explictly returns the current node when there are no equality delete files. So this code shouldn't cause any change to the explain plan for queries without Equality Delete files. For tables with Equality Delete files (which can't be created using presto) the existing implementation is almost unusable for even small amounts of deletes (see initial perfomance boost in PR message). The possibility of a regression when Equality Delete files exist is mitigated by the fact that previously they were not really queryable.

Given this information, how should we proceed?

Explain plan with multiple delete schemas:

MaterializedResult{rows=[[Fragment 1 [HASH]
    CPU: 159.75ms, Scheduled: 2.08s, Input: 27 rows (3.37kB); per task: avg.: 6.75 std.dev.: 4.60, Output: 18 rows (2.18kB), 4 tasks
    Output layout: [nationkey, name, regionkey, comment, $data_sequence_number_15, nationkey_17, $data_sequence_number_16, regionkey_19, $data_sequence_number_18, regionkey_22, nationkey_21, $data_sequence_number_20]
    Output partitioning: SINGLE []
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Filter[PlanNodeId 132][filterPredicate = IS_NULL(COALESCE($data_sequence_number_16, $data_sequence_number_18, $data_sequence_number_20))] => [nationkey:bigint, name:varchar, regionkey:bigint, comment:varchar, $data_sequence_number_15:bigint, nationkey_17:bigint, $data_sequence_number_16:bigint, regionkey_19:bigint, $data_sequence_number_18:bigint, regionkey_22:bigint, nationkey_21:bigint, $data_sequence_number_20:bigint]
            Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 241.00, network: ?}
            CPU: 2.00ms (0.89%), Scheduled: 18.00ms (0.39%), Output: 18 rows (2.18kB)
            Input avg.: 1.56 rows, Input std.dev.: 198.51%
        - LeftJoin[PlanNodeId 131][("regionkey" = "regionkey_22") AND ("nationkey" = "nationkey_21") AND ($data_sequence_number_15) < ($data_sequence_number_20)][$hashvalue_32, $hashvalue_33] => [nationkey:bigint, name:varchar, regionkey:bigint, comment:varchar, $data_sequence_number_15:bigint, nationkey_17:bigint, $data_sequence_number_16:bigint, regionkey_19:bigint, $data_sequence_number_18:bigint, regionkey_22:bigint, nationkey_21:bigint, $data_sequence_number_20:bigint]
                Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 241.00, network: ?}
                CPU: 92.00ms (41.07%), Scheduled: 1.43s (31.00%), Output: 25 rows (3.54kB)
                Left (probe) Input avg.: 1.56 rows, Input std.dev.: 198.51%
                Right (build) Input avg.: 0.06 rows, Input std.dev.: 387.30%
                Distribution: PARTITIONED
                SortExpression[$data_sequence_number_20]
            - Project[PlanNodeId 424][projectLocality = LOCAL] => [nationkey:bigint, name:varchar, regionkey:bigint, comment:varchar, $data_sequence_number_15:bigint, nationkey_17:bigint, $data_sequence_number_16:bigint, regionkey_19:bigint, $data_sequence_number_18:bigint, $hashvalue_32:bigint]
                    Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 130.00, network: ?}
                    CPU: 3.00ms (1.34%), Scheduled: 8.00ms (0.17%), Output: 25 rows (3.48kB)
                    Input avg.: 1.56 rows, Input std.dev.: 198.51%
                    $hashvalue_32 := combine_hash(combine_hash(BIGINT'0', COALESCE($operator$hash_code(regionkey), BIGINT'0')), COALESCE($operator$hash_code(nationkey), BIGINT'0')) (1:31)
                - LeftJoin[PlanNodeId 129][("regionkey" = "regionkey_19") AND ($data_sequence_number_15) < ($data_sequence_number_18)][$hashvalue, $hashvalue_29] => [nationkey:bigint, name:varchar, regionkey:bigint, comment:varchar, $data_sequence_number_15:bigint, nationkey_17:bigint, $data_sequence_number_16:bigint, regionkey_19:bigint, $data_sequence_number_18:bigint]
                        Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 130.00, network: ?}
                        CPU: 48.00ms (21.43%), Scheduled: 533.00ms (11.57%), Output: 25 rows (3.33kB)
                        Left (probe) Input avg.: 1.56 rows, Input std.dev.: 198.51%
                        Right (build) Input avg.: 0.06 rows, Input std.dev.: 387.30%
                        Distribution: PARTITIONED
                        SortExpression[$data_sequence_number_18]
                    - RemoteSource[2] => [nationkey:bigint, name:varchar, regionkey:bigint, comment:varchar, $data_sequence_number_15:bigint, nationkey_17:bigint, $data_sequence_number_16:bigint, $hashvalue:bigint]
                            CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 25 rows (3.30kB)
                            Input avg.: 1.56 rows, Input std.dev.: 198.51%
                    - LocalExchange[PlanNodeId 341][HASH][$hashvalue_29] (regionkey_19) => [regionkey_19:bigint, $data_sequence_number_18:bigint, $hashvalue_29:bigint]
                            Estimates: {source: CostBasedSourceInfo, rows: 1 (238B), cpu: 251.00, memory: 0.00, network: 65.00}
                            CPU: 0.00ns (0.00%), Scheduled: 28.00ms (0.61%), Output: 1 row (27B)
                            Input avg.: 0.06 rows, Input std.dev.: 387.30%
                        - RemoteSource[5] => [regionkey_19:bigint, $data_sequence_number_18:bigint, $hashvalue_30:bigint]
                                CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 1 row (27B)
                                Input avg.: 0.06 rows, Input std.dev.: 387.30%
            - LocalExchange[PlanNodeId 342][HASH][$hashvalue_33] (regionkey_22, nationkey_21) => [regionkey_22:bigint, nationkey_21:bigint, $data_sequence_number_20:bigint, $hashvalue_33:bigint]
                    Estimates: {source: CostBasedSourceInfo, rows: 1 (275B), cpu: 564.00, memory: 0.00, network: 120.00}
                    CPU: 0.00ns (0.00%), Scheduled: 5.00ms (0.11%), Output: 1 row (36B)
                    Input avg.: 0.06 rows, Input std.dev.: 387.30%
                - Project[PlanNodeId 426][projectLocality = LOCAL] => [regionkey_22:bigint, nationkey_21:bigint, $data_sequence_number_20:bigint, $hashvalue_34:bigint]
                        Estimates: {source: CostBasedSourceInfo, rows: 1 (275B), cpu: 453.00, memory: 0.00, network: 120.00}
                        CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 1 row (36B)
                        Input avg.: 0.06 rows, Input std.dev.: 387.30%
                    - RemoteSource[6] => [regionkey_22:bigint, nationkey_21:bigint, $data_sequence_number_20:bigint, $hashvalue_34:bigint, $hashvalue_35:bigint]
                            CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 1 row (45B)
                            Input avg.: 0.06 rows, Input std.dev.: 387.30%

Fragment 2 [HASH]
    CPU: 70.26ms, Scheduled: 2.43s, Input: 26 rows (3.14kB); per task: avg.: 6.50 std.dev.: 1.12, Output: 25 rows (3.30kB), 4 tasks
    Output layout: [nationkey, name, regionkey, comment, $data_sequence_number_15, nationkey_17, $data_sequence_number_16, $hashvalue_28]
    Output partitioning: HASH [regionkey][$hashvalue_28]
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Project[PlanNodeId 422][projectLocality = LOCAL] => [nationkey:bigint, name:varchar, regionkey:bigint, comment:varchar, $data_sequence_number_15:bigint, nationkey_17:bigint, $data_sequence_number_16:bigint, $hashvalue_28:bigint]
            Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 65.00, network: 1,951.00}
            CPU: 6.00ms (2.68%), Scheduled: 28.00ms (0.61%), Output: 25 rows (3.30kB)
            Input avg.: 1.56 rows, Input std.dev.: 179.56%
            $hashvalue_28 := combine_hash(BIGINT'0', COALESCE($operator$hash_code(regionkey), BIGINT'0')) (1:31)
        - LeftJoin[PlanNodeId 127][("nationkey" = "nationkey_17") AND ($data_sequence_number_15) < ($data_sequence_number_16)][$hashvalue_23, $hashvalue_25] => [nationkey:bigint, name:varchar, regionkey:bigint, comment:varchar, $data_sequence_number_15:bigint, nationkey_17:bigint, $data_sequence_number_16:bigint]
                Estimates: {source: CostBasedSourceInfo, rows: ? (?), cpu: ?, memory: 65.00, network: 1,951.00}
                CPU: 51.00ms (22.77%), Scheduled: 2.30s (50.02%), Output: 25 rows (3.01kB)
                Left (probe) Input avg.: 1.56 rows, Input std.dev.: 179.56%
                Right (build) Input avg.: 0.06 rows, Input std.dev.: 387.30%
                Distribution: PARTITIONED
                SortExpression[$data_sequence_number_16]
            - RemoteSource[3] => [nationkey:bigint, name:varchar, regionkey:bigint, comment:varchar, $data_sequence_number_15:bigint, $hashvalue_23:bigint]
                    CPU: 0.00ns (0.00%), Scheduled: 0.00ns (0.00%), Output: 25 rows (3.11kB)

@yingsu00
Copy link
Contributor

@jasonf20 Thanks for your information. Regarding TPCDS testing, I thought you have other engines that can generate equality deletes, but just learnt that it's actually hard for you to generate them now. Prestissimo will support equality deletes but it's not there yet. So I'll just withdraw the ask of TPCDS tests.

What I was concerned was that this may affect the overall query shape, since it's adding some new join nodes in the logical planning phase, and the join reordering rule will be performed later and may introduce some uncertainty. Sometimes the plan shape changes because of some seemingly unrelated issues and the result may not be ideal. That's why I was hoping you could run the TPCDS benchmark to rule out the possibility of some unexpected plan change. But if testing TPCDS is hard, we will test it ourselves in the future.

@jasonf20
Copy link
Collaborator Author

Thanks @yingsu00. I did validate the query shape is unchanged when disabled for queries on tables that I had with equality deletes. But not TPCDS tables, that isn't something I have access to. In my tests, when there are no equality deletes and/or when the flag is disabled the explain plan I got was the unchanged. Of course this is limited to the queries I tested but it does have some coverage. I think static analysis of the start of the rewriter can also help validate this.

@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch from f1f3c4f to 6ff947e Compare January 23, 2024 14:24
Copy link
Contributor

@aaneja aaneja left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still reviewing, will come back to this

@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch from 6ff947e to d691a77 Compare January 23, 2024 14:37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: There is a variant which has a Collection as its input, so Optional.of(LogicalRowExpressions.and(node.getFilters())) will suffice

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It ends up being a little more code since I need to check if the filters list is empty.

node.getFilters().isEmpty() ? Optional.empty() : Optional.of(LogicalRowExpressions.and(node.getFilters())),

Which do you prefer?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the second option so that it will create a flat Expression instead of a nested one.

@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch from d691a77 to b5bc884 Compare January 24, 2024 12:23
The current join nodes are in presto-main. Moving them requires moving a
lot of objets.

Since we don't need to allow the connectos to optimize the joins for now
we can just add an adapter node that will be converted back to the
internal node type when returned by connector optimzers.
Support for $path and $data_sequence_number hidden columns.

The first is generally useful and the second is a requirement for
implementing equality deletes as a join.
The current equality delete implementation applies deletes at the split
level.

Since equality deletes often apply to a lot of files, the current
implementation ends up opening the delete files #splits * #delete_files
times.

This commit implements equality deletes as a join. A connector optimizer
is added to apply the appropriate join(s).
@jasonf20 jasonf20 force-pushed the equality-deletes-as-join-pr branch from b5bc884 to 2df0c25 Compare January 24, 2024 18:31
Copy link
Contributor

@feilong-liu feilong-liu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only reviewed the first commit and skip the next two icerberg connector specific commits, lgtm

@tdcmeehan
Copy link
Contributor

Thank you @jasonf20 for this very nice contribution, and thank you @feilong-liu @hantangwangd @steveburnett @aaneja @yingsu00 for the thorough reviews.

@tdcmeehan tdcmeehan merged commit 4b458d4 into prestodb:master Jan 24, 2024
@jasonf20 jasonf20 deleted the equality-deletes-as-join-pr branch January 25, 2024 13:17
@yingsu00
Copy link
Contributor

@jasonf20 I saw you have another PR for Trino: cebergPlugin: Performance improvements for Equality Delete files which doesn't need to add joins. How about the comparisons of the two approches? cc @tdcmeehan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants