From d0d441ea055a0bd0a4889cd9b2c25e7c6f8b09ba Mon Sep 17 00:00:00 2001 From: Andrew Xie Date: Fri, 23 May 2025 15:40:09 -0700 Subject: [PATCH] Fix inconsistent ordering with offset and limit --- .../optimizations/AddLocalExchanges.java | 3 +- .../sql/planner/planPrinter/PlanPrinter.java | 9 ++++++ .../sql/planner/TestLogicalPlanner.java | 29 +++++++++++++++++++ .../presto/tests/AbstractTestQueries.java | 18 ++++++++++++ 4 files changed, 57 insertions(+), 2 deletions(-) diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 52baa68d194bd..d0f15ab3f08dc 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -307,11 +307,10 @@ public PlanWithProperties visitLimit(LimitNode node, StreamPreferredProperties p } // final limit requires that all data be in one stream - // also, a final changes the input organization completely, so we do not pass through parent preferences return planAndEnforceChildren( node, singleStream(), - defaultParallelism(session)); + parentPreferences.withDefaultParallelism(session)); } @Override diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java index 8de8906b60d60..3036b8e5815e9 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/planner/planPrinter/PlanPrinter.java @@ -91,6 +91,7 @@ import com.facebook.presto.sql.planner.plan.IndexJoinNode; import com.facebook.presto.sql.planner.plan.InternalPlanVisitor; import com.facebook.presto.sql.planner.plan.LateralJoinNode; +import com.facebook.presto.sql.planner.plan.OffsetNode; import com.facebook.presto.sql.planner.plan.RemoteSourceNode; import com.facebook.presto.sql.planner.plan.RowNumberNode; import com.facebook.presto.sql.planner.plan.SampleNode; @@ -1308,6 +1309,14 @@ public Void visitLateralJoin(LateralJoinNode node, Void context) return processChildren(node, context); } + @Override + public Void visitOffset(OffsetNode node, Void context) + { + addNode(node, "Offset", format("[%s]", node.getCount())); + + return processChildren(node, context); + } + @Override public Void visitPlan(PlanNode node, Void context) { diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java index 13385eb45b2e5..509eebb559a61 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/planner/TestLogicalPlanner.java @@ -1589,6 +1589,35 @@ public void testOffset() .withAlias("row_num", new RowNumberSymbolMatcher()))))); } + @Test + public void testOffsetWithLimit() + { + Session enableOffsetWithConcurrency = Session.builder(this.getQueryRunner().getDefaultSession()) + .setSystemProperty(OFFSET_CLAUSE_ENABLED, "true") + .setSystemProperty("task_concurrency", "2") // task_concurrency > 1 required to add possible local exchanges that fail this test for incorrect AddLocalExchanges + .build(); + + assertPlanWithSession("SELECT totalprice FROM orders ORDER BY totalprice OFFSET 1 LIMIT 512", + enableOffsetWithConcurrency, + false, + any( + strictProject( + ImmutableMap.of("totalprice", new ExpressionMatcher("totalprice")), + limit( + 512, + filter( + "row_num > BIGINT '1'", + rowNumber( + pattern -> pattern + .partitionBy(ImmutableList.of()), + anyTree( + sort( + ImmutableList.of(sort("totalprice", ASCENDING, LAST)), + any( + tableScan("orders", ImmutableMap.of("totalprice", "totalprice")))))) + .withAlias("row_num", new RowNumberSymbolMatcher())))))); + } + private Session noJoinReordering() { return Session.builder(this.getQueryRunner().getDefaultSession()) diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java index fd17cd6544c8d..91be901f4d1b9 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueries.java @@ -1105,6 +1105,24 @@ public void testOffsetEmptyResult() assertQueryReturnsEmptyResult(localSession, "SELECT name FROM nation ORDER BY regionkey OFFSET 100 ROWS LIMIT 20"); } + @Test + public void testOffsetLimitOrderByConsistency() + { + Session localSession = Session.builder(getSession()) + .setSystemProperty(OFFSET_CLAUSE_ENABLED, "true") + .build(); + + String query = "SELECT name FROM customer ORDER BY name OFFSET 1 LIMIT 256"; + MaterializedResult expectedResults = computeActual(localSession, query).toTestTypes(); + List expectedRows = expectedResults.getMaterializedRows(); + + for (int i = 0; i < 5; i++) { + MaterializedResult actualResults = computeActual(localSession, query).toTestTypes(); + List actualRows = actualResults.getMaterializedRows(); + assertEquals(actualRows, expectedRows, "Mismatched results on run " + i); + } + } + @Test public void testRepeatedAggregations() {