Skip to content

[native] Error out on StreamingAggregation#22998

Closed
karteekmurthys wants to merge 1 commit intoprestodb:masterfrom
karteekmurthys:skip-stream-agg
Closed

[native] Error out on StreamingAggregation#22998
karteekmurthys wants to merge 1 commit intoprestodb:masterfrom
karteekmurthys:skip-stream-agg

Conversation

@karteekmurthys
Copy link
Copy Markdown
Contributor

@karteekmurthys karteekmurthys commented Jun 13, 2024

Description

This change disables StreamingAggregation by default. Refer this issue #22585. Prestissimo creates a StreamingAgg when there are pregrouped keys in the AggregationNode. This assumption breaks when the source to the Aggregation is a NestedLoopJoin.

Test Plan

Migrated the Presto tests related correlated subqueries under presto-native.

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* ... :pr:`12345`
* ... :pr:`12345`

Hive Connector Changes
* ... :pr:`12345`
* ... :pr:`12345`

If release note is NOT required, use:

== NO RELEASE NOTE ==

@karteekmurthys karteekmurthys requested a review from a team as a code owner June 13, 2024 02:07
@karteekmurthys karteekmurthys marked this pull request as draft June 13, 2024 02:07
Copy link
Copy Markdown
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel the fix belongs to Presto's optimizer. Somewhere it has an assumption that LEFT JOIN preserves the order of probe-size rows. This assumption is not true for non-equi LEFT JOINs in Native Execution. It would be nice to find that place and add a check there for native execution.

@aditi-pandit
Copy link
Copy Markdown
Contributor

@karteekmurthys : Can you add a test ? We could add the query with incorrect results we were debugging.

@aditi-pandit
Copy link
Copy Markdown
Contributor

aditi-pandit commented Jun 13, 2024

I feel the fix belongs to Presto's optimizer. Somewhere it has an assumption that LEFT JOIN preserves the order of probe-size rows. This assumption is not true for non-equi LEFT JOINs in Native Execution. It would be nice to find that place and add a check there for native execution.

@mbasmanova : We are working to track down that code. The concern is that if there are multiple places which need correction then it could take a while.

But at present we are getting incorrect results so this is a severe issue.

For the interim we want to add this change to avoid the incorrect results. This is a very safe change as we will guarantee a safe Velox plan. Since the plan conversion decides the join type, we can avoid generating a wrong Velox plan which sets pre-grouping as well.

@karteekmurthys
Copy link
Copy Markdown
Contributor Author

karteekmurthys commented Jun 13, 2024

@karteekmurthys : Can you add a test ? We could add the query with incorrect results we were debugging.

Yes, I am working on it. The PR is not ready yet. I will add a test in PlanConverterTest.cpp

@karteekmurthys
Copy link
Copy Markdown
Contributor Author

@aditi-pandit Please review, ported the tests.

@mbasmanova
Copy link
Copy Markdown
Contributor

@karteekmurthys @aditi-pandit Folks, please, reconsider this fix. Presto-to-Velox plan conversion is not the right place for it. Do take the time to debug fully and identify a place in the optimizer that needs to be made conditional on native execution. Thanks.

@aditi-pandit
Copy link
Copy Markdown
Contributor

aditi-pandit commented Jun 14, 2024

@karteekmurthys @aditi-pandit Folks, please, reconsider this fix. Presto-to-Velox plan conversion is not the right place for it. Do take the time to debug fully and identify a place in the optimizer that needs to be made conditional on native execution. Thanks.

@mbasmanova : I agree that it would be great to disable the optimizer logic. But Im curious why you have concerns about this code. This code saves the engine from itself. We have incorrect results right now which is a very severe problem. Even if we fix the optimizer code, we might still want to retain this code as it catches a big vulnerability. Its possible that in the future someone could still write optimizer code making that assumption, and it would need to be caught in code review by someone who understands native engine as well. There aren't many engineers who understand both pieces.

If the concern is that we will lose track on the optimizer fix, I assure you we are not closing this thread. We are looking at the optimizer code.

@mbasmanova
Copy link
Copy Markdown
Contributor

@aditi-pandit Aditi, indeed, correctness issues are very important. We must ensure Prestissimo returns correct results.

The problem with this change is that it adds a one-off logic to Presto-to-Velox plan translation that attempts to "fix" an optimizer bug. This logic is very limited and hard to reason about.

To prevent against future breakages one needs to invest in a robust suite of tests. Specifically, we should add a test that fails without the changes.

Copy link
Copy Markdown
Contributor

@mbasmanova mbasmanova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments above.

@aditi-pandit
Copy link
Copy Markdown
Contributor

aditi-pandit commented Jun 15, 2024

@mbasmanova : Was looking at the optimizer code bit more today. I was able to narrow down the issue to the addLocalExchanges rule is (see explanation at #22585 (comment))

Do you have any suggestions ?

@aditi-pandit
Copy link
Copy Markdown
Contributor

@karteekmurthys : Please can you rebase this PR. We are considering submitting this as Meta encountered this issue in production as well.

VELOX_UNSUPPORTED("Unknown join type");
}

inline bool isNestedLoopJoin(core::JoinType joinType) {
Copy link
Copy Markdown
Contributor

@aditi-pandit aditi-pandit Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys : You have to check for node->criteria.empty() here.

std::dynamic_pointer_cast<const protocol::AggregationNode>(node)) {
return toVeloxQueryPlan(aggregation, tableWriteInfo, taskId);
bool isStreamingAggAllowed = true;
if (auto joinNode = std::dynamic_pointer_cast<protocol::JoinNode>(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if nested loop join is not a direct source, but is in the plan tree (e.g. there's a project node in between)?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel : We saw this property set by the optimizer in aggregation over nested loop join via StreamPropertyDerivations #23315. But you are right that the ideal fix should look for all nodes that can carry forward projections of the grouping keys.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question is what if you have like so

aggregation
|
filter or identity project (any node that propagates the properties of its input)
|
nested loop join
|
something sorted

the nested loop join will mess up the sortedness, but we won't catch it with this code because it's not a direct source of the aggregation node. so we will still have a correctness bug.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel : Yes, the ideal fix should check for any nested loop join between aggregation and the TableScan it is originating from. I believe @karteekmurthys is going to change this code.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel I am trying to see if I can do a recursive search for a nestedloopjoin node and if present skip streaming.


inline bool isNestedLoopJoin(const std::shared_ptr<protocol::JoinNode> node) {
auto joinType = toJoinType(node->type);
return node->criteria.empty() || core::isInnerJoin(joinType) ||
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys The condition is just node->criteria.empty(). The rest of the clauses catch all joinTypes. Unless I am missing something.

@karteekmurthys karteekmurthys force-pushed the skip-stream-agg branch 2 times, most recently from 28c6c58 to 285f553 Compare July 31, 2024 19:03
@amitkdutta amitkdutta changed the title [Native] Avoid streaming aggregation when source is NestedLoopJoin [native] Avoid streaming aggregation when source is NestedLoopJoin Jul 31, 2024
if (auto aggregation =
std::dynamic_pointer_cast<const protocol::AggregationNode>(node)) {
return toVeloxQueryPlan(aggregation, tableWriteInfo, taskId);
bool isStreamingAggAllowed = true;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys Nit : Abstract this into a function.

Please also add a comment about why this check is required.

@karteekmurthys karteekmurthys changed the title [native] Avoid streaming aggregation when source is NestedLoopJoin [Do Not review] [native] Avoid streaming aggregation when source is NestedLoopJoin Jul 31, 2024
while (parent.get()) {
if (auto joinNode =
std::dynamic_pointer_cast<protocol::JoinNode>(parent)) {
isStreamingAggAllowed = !joinNode->criteria.empty();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you aren't exiting the loop early, then you need to && this with isStreamingAggAllowed so you don't overwrite a false value with a true value (e.g. you have a join on top of a join, and only the top one is nestedLoopJoin)

Copy link
Copy Markdown
Contributor

@rschlussel rschlussel Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Edit: sorry misread the code, I see you are exiting early, but you should only exit if it's false. Otherwise you should continue looping in case there are nested joins.

Copy link
Copy Markdown
Contributor Author

@karteekmurthys karteekmurthys Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel I am still working on this. The source needs to be moved to the PlanNode and I cannot edit the presto_protocol.h directly for that. Java files under presto_spi is the source for these structs. I am still figuring out best way to check the ancestors.

Copy link
Copy Markdown
Contributor

@aditi-pandit aditi-pandit Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel, @karteekmurthys : This fix can be simplified since we only allow Project, Filter or Limit, DistinctLimit in between Agg and Join. Streaming should immediately return false for any other node in between (e.g. we don't want an OrderBy in between).

Think Karteek is changing to that.

But if this doesn't wrap up in the next round, we can do with never setting pre-grouping.

@rschlussel
Copy link
Copy Markdown
Contributor

Can we do a quick fix where we disable streaming aggregation for all cases (even if there's not a nested loop join, so we can quickly mitigate the correctness bug, and then we can iterate to limit the scope to just nested loop joins, or the optimizer change)?

namespace facebook::presto::protocol {
struct PlanNode : public JsonEncodedSubclass {
PlanNodeId id = {};
std::shared_ptr<PlanNode> source = {};
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys : Don't think you can change this without java side changes.

if (auto aggregation =
std::dynamic_pointer_cast<const protocol::AggregationNode>(node)) {
return toVeloxQueryPlan(aggregation, tableWriteInfo, taskId);
auto parent = aggregation->source;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys :
Nit : parent is not needed.

You can simply change line 990 to streamable = false (leaving a comment). There isn't a need to pass a variable from this point

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@karteekmurthys karteekmurthys force-pushed the skip-stream-agg branch 2 times, most recently from dc635b9 to 11e82c6 Compare July 31, 2024 22:13
@karteekmurthys karteekmurthys changed the title [Do Not review] [native] Avoid streaming aggregation when source is NestedLoopJoin [native] Avoid streaming aggregation when source is NestedLoopJoin Jul 31, 2024
bool streamable = !node->preGroupedVariables.empty() &&
node->groupingSets.groupingSetCount == 1 &&
node->groupingSets.globalGroupingSets.empty();
/// TODO karteekmurthys@ Re-enable after
Copy link
Copy Markdown
Contributor

@aditi-pandit aditi-pandit Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys : Nit : There isn't a need to surface this in external documentation.

Just using // is fine.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@aditi-pandit aditi-pandit marked this pull request as ready for review July 31, 2024 22:24
@karteekmurthys karteekmurthys changed the title [native] Avoid streaming aggregation when source is NestedLoopJoin [native] Disable StreamingAggregation Jul 31, 2024
aditi-pandit
aditi-pandit previously approved these changes Jul 31, 2024
Copy link
Copy Markdown
Contributor

@aditi-pandit aditi-pandit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

node->groupingSets.globalGroupingSets.empty();
// TODO karteekmurthys@ Re-enable after
// fixing:https://github.com/prestodb/presto/issues/22585
bool streamable = false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this safe? If planner planned streaming agg it means it has some assumptions that are wrong. Failing the query might be safer.

Copy link
Copy Markdown
Contributor

@aditi-pandit aditi-pandit Jul 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova :

Presto planner doesn't really specify streaming agg. The streaming agg choice is made in Velox planner based on preGrouped variables https://github.com/facebookincubator/velox/blob/main/velox/exec/LocalPlanner.cpp#L499

streamable just sets preGroupedVariables for Velox AggregationNode in PrestoToVeloxQueryPlan.

https://github.com/prestodb/presto/blob/master/presto-native-execution/presto_cpp/main/types/PrestoToVeloxQueryPlan.cpp#L1010

So I think blanking out preGrouped variables is sufficient for letting the aggregation proceed with HashAggregation.

Let me know if you think otherwise.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aditi-pandit Aditi, you are correct that replacing streaming agg with hash agg is safe (just less efficient). However, my concern is that there could be other changes to the plan made by the optimizer based on wrong assumptions that led it to specify pre-grouped fields "incorrectly". In general, it is not safe to "question" the optimizer. However, here we say that we've seen cases where agg with pre-grouped keys was a result of optimizer making incorrect assumptions, hence, we are not following that choice. What I'm saying is that if part of the plan cannot be trusted, I don't think we can trust the rest of the plan. Hence, I'm saying that it would be safer to reject the whole plan and fail.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mbasmanova : The pre-Grouping seemed broken only with nestedLoopJoin, but okay otherwise.

The current fix to disable the streaming agg always is also aggressive to avoid fine-tuning plan patterns for aggregations over Nested Loop joins.

I feel rejecting the whole plan would be super conservative. But if you want that behavior in Meta production I won't question it. We will likely change the behavior internally at IBM.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that a proper fix will be ready in Velox soon, so hopefully no Presto mitigation will be needed at all. But FWIW, I don't think there are other things relying on the pregrouped aggregation variables, so I think that aspect of just converting to a regular aggregation that ignores the pregrouped variables is pretty safe for a short term fix (wouldn't rely on it longer term because of course things can change).

The riskier part that neither PR addresses is whether other optimizations unrelated to aggregation are also depending on nested loop join maintaining the ordering of the input. Would we plan a merge join after a nested loop join because we think the data is ordered? Or drop an order by after a nested loop join because we think the data is already ordered? If we'd like to be conservative/thorough, the safest thing to do is rather than specifically handling aggregation, fail all queries that are using NestedLoopJoin, which is the source of the incorrect optimization.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@karteekmurthys Just discussed with @aditi-pandit. We agreed to fail the queries in this case, similar to what @rschlussel and @mbasmanova suggested. Lets change it to throw an exception instead of disabling streaming aggregation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We currently use NestedLoopJoin when there is no Join criteria sepcified. If we totally disable it, we don't have alternative way to implement this situation.

if (node->criteria.empty()) {
    const bool isNestedJoinType = core::isInnerJoin(joinType) ||
        core::isLeftJoin(joinType) || core::isRightJoin(joinType) ||
        core::isFullJoin(joinType);
    if (isNestedJoinType) {
      return std::make_shared<core::NestedLoopJoinNode>(
          node->id,
          joinType,
          node->filter ? exprConverter_.toVeloxExpr(*node->filter) : nullptr,
          toVeloxQueryPlan(node->left, tableWriteInfo, taskId),
          toVeloxQueryPlan(node->right, tableWriteInfo, taskId),
          toRowType(node->outputVariables, typeParser_));
    }
    VELOX_UNSUPPORTED(
        "JoinNode has empty criteria that cannot be "
        "satisfied by NestedJoinNode or HashJoinNode");
  }

@mbasmanova Can you confirm what is the consequence of disabling Nestedloopjoin for Meta's workloads?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we fail for this case it means that a small percentage of queries will start to fail in our native workers and need to run in Java clusters (and even though it is a small percentage, you are right that it's still a much larger impact than just failing for streaming aggregations, and it's not impact we want to take on lightly). still we think it's the right think to do for the short term because even though most nested loop join queries are probably fine, we can't guarantee the correctness of queries that have nested loop joins in them.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rschlussel : NLJ is used for any join where there isn't any equality condition between join columns. Sounds like you have have very little traffic for this.

@karteekmurthys : We should do the conservative approach in the open source project. We will have to internally enable this for us at IBM separately. Please follow up with Ethan for that.

@karteekmurthys karteekmurthys changed the title [native] Disable StreamingAggregation [native] Error out on StreamingAggregation Aug 1, 2024

// TODO karteekmurthys@ Re-enable after
// fixing:https://github.com/prestodb/presto/issues/22585
if (streamable) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for all the back and forth. instead of failing in plan conversion for aggregation as you are doing here, can we fail in plan conversion for join if it's a nested loop join? That would be a more robust mitigation because it's the root cause of the issue, and there are other optimizations aside from streaming aggregation that might be assuming nested loop join is preserving order when it's not e.g. we might remove an order by because we think the input is already ordered or plan a merge join because we think the input is ordered.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I am turning off NLJ here : #23341 .

@pedroerp
Copy link
Copy Markdown
Contributor

pedroerp commented Aug 2, 2024

This PR should address the NLJ issue in Velox: facebookincubator/velox#10651

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants