Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible data corruption in "Skipping partial aggregation" change #11850

Closed
Tracked by #11476
andygrove opened this issue Aug 6, 2024 · 12 comments
Closed
Tracked by #11476

Possible data corruption in "Skipping partial aggregation" change #11850

andygrove opened this issue Aug 6, 2024 · 12 comments
Assignees
Labels
bug Something isn't working

Comments

@andygrove
Copy link
Member

andygrove commented Aug 6, 2024

Describe the bug

When we upgrade DataFusion Comet to use a version of DataFusion that inclues #11627, we see incorrect results for an aggregation in TPC-DS q97.

- q97 *** FAILED *** (11 seconds, 703 milliseconds)
  java.lang.Exception: Expected "53[7833        285408] 200", but got "53[8024  285422] 200" Result did not match

To Reproduce

Start with apache/datafusion-comet#783 and then upgrade DataFusion to include #11627. I do not have a repro for DataFusion yet.

Expected behavior

No response

Additional context

No response

@andygrove andygrove added the bug Something isn't working label Aug 6, 2024
@andygrove andygrove self-assigned this Aug 6, 2024
@andygrove
Copy link
Member Author

@korowa @alamb fyi - I am not sure how to reproduce this in a DataFusion example yet, but will try and do that

@alamb
Copy link
Contributor

alamb commented Aug 6, 2024

Thanks @andygrove -- if you can get a repro we'll get it fixed asap

One bug I know that was present that we have fixed is #11833 / 1c98e6e -- not sure if you tried with that one

@korowa
Copy link
Contributor

korowa commented Aug 6, 2024

The query is

WITH ssci AS (
  SELECT
    ss_customer_sk customer_sk,
    ss_item_sk item_sk
  FROM store_sales, date_dim
  WHERE ss_sold_date_sk = d_date_sk
    AND d_month_seq BETWEEN 1200 AND 1200 + 11
  GROUP BY ss_customer_sk, ss_item_sk),
    csci AS (
    SELECT
      cs_bill_customer_sk customer_sk,
      cs_item_sk item_sk
    FROM catalog_sales, date_dim
    WHERE cs_sold_date_sk = d_date_sk
      AND d_month_seq BETWEEN 1200 AND 1200 + 11
    GROUP BY cs_bill_customer_sk, cs_item_sk)
SELECT
  sum(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NULL THEN 1 ELSE 0 END) store_only,
  sum(CASE WHEN ssci.customer_sk IS NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END) catalog_only,
  sum(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END) store_and_catalog
FROM ssci
  FULL OUTER JOIN csci ON (ssci.customer_sk = csci.customer_sk
    AND ssci.item_sk = csci.item_sk)
LIMIT 100

The last aggregation with SUM(CASE WHEN ...) is not a row hash aggregate and should not trigger this feature, and the other two don't use any accumulators, so, probably, the issue is not related to accumulator expressions (I'll recheck it anyway).

@andygrove is there a chance that Comet may use datafusion Partial aggregation results without finalizing the aggregation? In this case partial-aggs in both CTEs may produce duplicates, which should result in increased count values in the resultset.

UPD: I've updated "user-facing changes" section in #11627, and, I suppose, this PR also needs an api-change label, since it is a breaking change for downstream projects using separate Datafusion operators.

@andygrove andygrove added the api change Changes the API exposed to users of the crate label Aug 6, 2024
@andygrove
Copy link
Member Author

This query works fine in DataFusion, but not in Comet. Comet does use both Partial and Final aggregates. I am working on debugging this to better understand where this is going wrong.

Perhaps there could be an option to disable this functionality?

@alamb
Copy link
Contributor

alamb commented Aug 6, 2024

Perhaps there could be an option to disable this functionality?

Yes you can set the datafusion.execution.skip_partial_aggregation_probe_ratio_threshold to zero

Config setting: https://datafusion.apache.org/user-guide/configs.html

BTW (🎣 for reviews) I have some PRs that make it easier to see what is going on:

@andygrove
Copy link
Member Author

andygrove commented Aug 6, 2024

@korowa @alamb I have now confirmed that we need to disable this feature in Comet because this is not compatible with how Spark handles partial aggregates and the duplicates cause issues. Thanks for the guidance. I'll close this issue once I have confirmed that I can disable the feature.

@alamb
Copy link
Contributor

alamb commented Aug 7, 2024

@andygrove if you have additional details about how spark handles partial aggregates, I would love to read about them

@andygrove andygrove removed the api change Changes the API exposed to users of the crate label Aug 7, 2024
@andygrove
Copy link
Member Author

andygrove commented Aug 7, 2024

@alamb Sure, here is one of the query stages after we have translated it to a DataFusion plan. Note that we are performing a join on the output of two partial aggregates and then applying the final aggregate after the join. Having duplicates on either input to the join causes extra rows to be generated in the join output.

Perhaps we'll need to start thinking about having a physical optimizer phase in Comet so that we can leverage the "skip partial aggregates" feature in some cases.

 ProjectionExec: expr=[sum@0 as col_0, sum@1 as col_1, sum@2 as col_2]
  AggregateExec: mode=Final, gby=[], aggr=[sum, sum, sum]
    AggregateExec: mode=Partial, gby=[], aggr=[sum, sum, sum]
      ProjectionExec: expr=[col_0@0 as col_0, col_0@2 as col_1]
        SortMergeJoin: join_type=Full, on=[(col_0@0, col_0@0), (col_1@1, col_1@1)]
          SortExec: expr=[col_0@0 ASC,col_1@1 ASC], preserve_partitioning=[false]
            CopyExec
              ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1]
                AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1], aggr=[]
                  ScanExec: schema=[col_0: Int32, col_1: Int32]
          SortExec: expr=[col_0@0 ASC,col_1@1 ASC], preserve_partitioning=[false]
            CopyExec
              ProjectionExec: expr=[col_0@0 as col_0, col_1@1 as col_1]
                AggregateExec: mode=Partial, gby=[col_0@0 as col_0, col_1@1 as col_1], aggr=[]
                  ScanExec: schema=[col_0: Int32, col_1: Int32]

@andygrove
Copy link
Member Author

Also worth noting that in this case, the partial aggregates in the join input have no aggregate expressions, just the group by. The output of the join goes through a partial/final aggregate pair, and we could potentially benefit from the "skip partial aggregate" feature in this case.

@andygrove
Copy link
Member Author

This is starting to feel like a bug again. I will try and create a repro today based on this join example.

@andygrove
Copy link
Member Author

I cannot repro in DataFusion via SQL because DataFusion creates a very different plan, so I will close this issue and explore implementing optimizer rules in Comet to handle this (but in the short term we will just set the threshold high to disable the feature)

@alamb
Copy link
Contributor

alamb commented Aug 8, 2024

Perhaps we'll need to start thinking about having a physical optimizer phase in Comet so that we can leverage the "skip partial aggregates" feature in some cases.

Looking at the plans in #11850 (comment) I wonder if it could be because the top AggregateExec is trying to take advantage of sorted outputs but when aggregate goes into "skip partial agg mode" it no longer produces a sorted input stream or something 🤔

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants