Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,10 @@ public PlanWithProperties visitLimit(LimitNode node, StreamPreferredProperties p
}

// final limit requires that all data be in one stream
// also, a final changes the input organization completely, so we do not pass through parent preferences
return planAndEnforceChildren(
node,
singleStream(),
defaultParallelism(session));
parentPreferences.withDefaultParallelism(session));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import com.facebook.presto.sql.planner.plan.IndexJoinNode;
import com.facebook.presto.sql.planner.plan.InternalPlanVisitor;
import com.facebook.presto.sql.planner.plan.LateralJoinNode;
import com.facebook.presto.sql.planner.plan.OffsetNode;
import com.facebook.presto.sql.planner.plan.RemoteSourceNode;
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
Expand Down Expand Up @@ -1308,6 +1309,14 @@ public Void visitLateralJoin(LateralJoinNode node, Void context)
return processChildren(node, context);
}

@Override
public Void visitOffset(OffsetNode node, Void context)
{
addNode(node, "Offset", format("[%s]", node.getCount()));

return processChildren(node, context);
}

@Override
public Void visitPlan(PlanNode node, Void context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,35 @@ public void testOffset()
.withAlias("row_num", new RowNumberSymbolMatcher())))));
}

@Test
public void testOffsetWithLimit()
{
Session enableOffsetWithConcurrency = Session.builder(this.getQueryRunner().getDefaultSession())
.setSystemProperty(OFFSET_CLAUSE_ENABLED, "true")
.setSystemProperty("task_concurrency", "2") // task_concurrency > 1 required to add possible local exchanges that fail this test for incorrect AddLocalExchanges
.build();

assertPlanWithSession("SELECT totalprice FROM orders ORDER BY totalprice OFFSET 1 LIMIT 512",
enableOffsetWithConcurrency,
false,
any(
strictProject(
ImmutableMap.of("totalprice", new ExpressionMatcher("totalprice")),
limit(
512,
filter(
"row_num > BIGINT '1'",
rowNumber(
pattern -> pattern
.partitionBy(ImmutableList.of()),
anyTree(
sort(
ImmutableList.of(sort("totalprice", ASCENDING, LAST)),
any(
tableScan("orders", ImmutableMap.of("totalprice", "totalprice"))))))
.withAlias("row_num", new RowNumberSymbolMatcher()))))));
}

private Session noJoinReordering()
{
return Session.builder(this.getQueryRunner().getDefaultSession())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1105,6 +1105,24 @@ public void testOffsetEmptyResult()
assertQueryReturnsEmptyResult(localSession, "SELECT name FROM nation ORDER BY regionkey OFFSET 100 ROWS LIMIT 20");
}

@Test
public void testOffsetLimitOrderByConsistency()
{
Session localSession = Session.builder(getSession())
.setSystemProperty(OFFSET_CLAUSE_ENABLED, "true")
.build();

String query = "SELECT name FROM customer ORDER BY name OFFSET 1 LIMIT 256";
MaterializedResult expectedResults = computeActual(localSession, query).toTestTypes();
List<MaterializedRow> expectedRows = expectedResults.getMaterializedRows();

for (int i = 0; i < 5; i++) {
MaterializedResult actualResults = computeActual(localSession, query).toTestTypes();
List<MaterializedRow> actualRows = actualResults.getMaterializedRows();
assertEquals(actualRows, expectedRows, "Mismatched results on run " + i);
}
}

@Test
public void testRepeatedAggregations()
{
Expand Down
Loading