Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public class FeaturesConfig
private DataSize filterAndProjectMinOutputPageSize = DataSize.of(500, KILOBYTE);
Copy link
Copy Markdown
Member

@sopel39 sopel39 Feb 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if number of rows after pushdown is smaller then without pushdown it could significantly increase cpu overhead of underlying source (table scans might be much cheaper than join). I think it would be great to determine what's the impact of pushdown on underlying connectors. It could be that join pushdown is beneficial only when joins are very non selective and users don't want cpu of underlying connector to increase significantly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Yet I would assume that you will still be able to disable pushdown on per-connector level in configuration. As well as per-query using session.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally -- #6874 provides both catalog level config and session toggle.

private int filterAndProjectMinOutputPageRowCount = 256;
private int maxGroupingSets = 2048;
private JoinPushdownMode joinPushdownMode = JoinPushdownMode.DISABLED;
private JoinPushdownMode joinPushdownMode = JoinPushdownMode.AUTOMATIC;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see conversation about code level documentation in the other pr

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment as a separate commit before introducing AUTOMATIC mode.


public enum JoinReorderingStrategy
{
Expand Down Expand Up @@ -179,9 +179,12 @@ public enum JoinPushdownMode
* Try to push all joins except cross-joins to connector.
*/
EAGER,
// TODO Add cost based logic to join pushdown
// AUTOMATIC,
/**/;
/**
* Determine automatically if push join to connector based on table statistics.
* Do not perform join in absence of table statistics.
*/
AUTOMATIC,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is safe to make it the default

/**/
}

public double getCpuCostWeight()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.Session;
import io.trino.cost.PlanNodeStatsEstimate;
import io.trino.matching.Capture;
import io.trino.matching.Captures;
import io.trino.matching.Pattern;
Expand All @@ -32,6 +33,7 @@
import io.trino.sql.ExpressionUtils;
import io.trino.sql.analyzer.FeaturesConfig.JoinPushdownMode;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.TypeProvider;
import io.trino.sql.planner.iterative.Rule;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.Patterns;
Expand Down Expand Up @@ -62,6 +64,7 @@
import static io.trino.sql.planner.plan.Patterns.Join.left;
import static io.trino.sql.planner.plan.Patterns.Join.right;
import static io.trino.sql.planner.plan.Patterns.tableScan;
import static java.lang.Double.isNaN;
import static java.util.Objects.requireNonNull;

public class PushJoinIntoTableScan
Expand Down Expand Up @@ -114,6 +117,10 @@ public Result apply(JoinNode joinNode, Captures captures, Context context)
return Result.empty();
}

if (skipJoinPushdownBasedOnCost(joinNode, context)) {
return Result.empty();
}

Map<String, ColumnHandle> leftAssignments = left.getAssignments().entrySet().stream()
.collect(toImmutableMap(entry -> entry.getKey().getName(), Map.Entry::getValue));

Expand Down Expand Up @@ -162,6 +169,43 @@ public Result apply(JoinNode joinNode, Captures captures, Context context)
return Result.ofPlanNode(new TableScanNode(joinNode.getId(), handle, joinNode.getOutputSymbols(), newAssignments.build(), newEnforcedConstraint, false));
}

private boolean skipJoinPushdownBasedOnCost(JoinNode joinNode, Context context)
{
if (getJoinPushdownMode(context.getSession()) != JoinPushdownMode.AUTOMATIC) {
return false;
}

TypeProvider types = context.getSymbolAllocator().getTypes();

// returning as quickly as possible to avoid unnecessary, costly work

PlanNodeStatsEstimate leftStats = context.getStatsProvider().getStats(joinNode.getLeft());
double leftOutputSize = leftStats.getOutputSizeInBytes(joinNode.getLeft().getOutputSymbols(), types);
if (isNaN(leftOutputSize)) {
return true;
}

PlanNodeStatsEstimate rightStats = context.getStatsProvider().getStats(joinNode.getRight());
double rightOutputSize = rightStats.getOutputSizeInBytes(joinNode.getRight().getOutputSymbols(), types);
if (isNaN(rightOutputSize)) {
return true;
}

PlanNodeStatsEstimate joinStats = context.getStatsProvider().getStats(joinNode);
double joinOutputSize = joinStats.getOutputSizeInBytes(joinNode.getOutputSymbols(), types);
if (isNaN(joinOutputSize)) {
return true;
}

if (joinOutputSize > leftOutputSize + rightOutputSize) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding some factor here, e.g pushed down join should produce 2x less rows than in trino. Such factor might need to be empirically established

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so you mean to replace left + right with max(left, right) * 0.5? Works for me, given that the current formula is not very scientificly determined.
I think we should do "something reasonable" & iterate.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - I find initial value of a factor 1.0 as good as 0.5

// This is poor man's estimation if it makes more sense to perform join in source database or Trino.
// The assumption here is that cost of performing join in source database is less than or equal to cost of join in Trino.
// We resolve tie for pessimistic case (both join costs equal) on cost of sending the data from source database to Trino.
return true;
}
return false;
}

private TupleDomain<ColumnHandle> deriveConstraint(TupleDomain<ColumnHandle> sourceConstraint, Map<ColumnHandle, ColumnHandle> columnMapping, boolean nullable)
{
TupleDomain<ColumnHandle> constraint = sourceConstraint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.trino.connector.MockConnectorColumnHandle;
import io.trino.connector.MockConnectorFactory;
import io.trino.connector.MockConnectorTableHandle;
import io.trino.cost.PlanNodeStatsEstimate;
import io.trino.metadata.TableHandle;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ColumnMetadata;
Expand All @@ -34,8 +35,10 @@
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
import io.trino.sql.planner.Symbol;
import io.trino.sql.planner.iterative.rule.test.RuleAssert;
import io.trino.sql.planner.iterative.rule.test.RuleTester;
import io.trino.sql.planner.plan.JoinNode;
import io.trino.sql.planner.plan.PlanNodeId;
import io.trino.sql.planner.plan.TableScanNode;
import io.trino.sql.tree.ArithmeticBinaryExpression;
import io.trino.sql.tree.ComparisonExpression;
Expand All @@ -47,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.function.Predicate;

import static com.google.common.base.Predicates.equalTo;
Expand Down Expand Up @@ -579,6 +583,164 @@ public void testPushJoinIntoTableRequiresFullColumnHandleMappingInResult()
}
}

@Test(dataProvider = "testAutomaticJoinPushDownParams")
public void testAutomaticJoinPushDown(OptionalDouble leftRows, OptionalDouble righRows, OptionalDouble joinRows, boolean pushdownExpected)
{
Session pushdownAutomaticSession = Session.builder(MOCK_SESSION)
.setSystemProperty("join_pushdown", "AUTOMATIC")
.build();

try (RuleTester ruleTester = defaultRuleTester()) {
MockConnectorFactory connectorFactory = createMockConnectorFactory((session, applyJoinType, left, right, joinConditions, leftAssignments, rightAssignments) -> {
assertThat(((MockConnectorTableHandle) left).getTableName()).isEqualTo(TABLE_A_SCHEMA_TABLE_NAME);
assertThat(((MockConnectorTableHandle) right).getTableName()).isEqualTo(TABLE_B_SCHEMA_TABLE_NAME);
Assertions.assertThat(applyJoinType).isEqualTo(toSpiJoinType(JoinNode.Type.INNER));
Assertions.assertThat(joinConditions).containsExactly(new JoinCondition(JoinCondition.Operator.EQUAL, COLUMN_A1_VARIABLE, COLUMN_B1_VARIABLE));

return Optional.of(new JoinApplicationResult<>(
JOIN_CONNECTOR_TABLE_HANDLE,
JOIN_TABLE_A_COLUMN_MAPPING,
JOIN_TABLE_B_COLUMN_MAPPING));
});

ruleTester.getQueryRunner().createCatalog(MOCK_CATALOG, connectorFactory, ImmutableMap.of());

RuleAssert ruleAssert = ruleTester.assertThat(new PushJoinIntoTableScan(ruleTester.getMetadata()))
.overrideStats("left", new PlanNodeStatsEstimate(leftRows.orElse(Double.NaN), ImmutableMap.of()))
.overrideStats("right", new PlanNodeStatsEstimate(righRows.orElse(Double.NaN), ImmutableMap.of()))
.overrideStats("join", new PlanNodeStatsEstimate(joinRows.orElse(Double.NaN), ImmutableMap.of()))
.on(p -> {
Symbol columnA1Symbol = p.symbol(COLUMN_A1);
Symbol columnA2Symbol = p.symbol(COLUMN_A2);
Symbol columnB1Symbol = p.symbol(COLUMN_B1);
TableScanNode left = new TableScanNode(
new PlanNodeId("left"),
TABLE_A_HANDLE,
ImmutableList.of(columnA1Symbol, columnA2Symbol),
ImmutableMap.of(
columnA1Symbol, COLUMN_A1_HANDLE,
columnA2Symbol, COLUMN_A2_HANDLE),
TupleDomain.all(),
false);

TableScanNode right = new TableScanNode(
new PlanNodeId("right"),
TABLE_B_HANDLE,
ImmutableList.of(columnB1Symbol),
ImmutableMap.of(columnB1Symbol, COLUMN_B1_HANDLE),
TupleDomain.all(),
false);

return join(new PlanNodeId("join"), JoinNode.Type.INNER, left, right, new JoinNode.EquiJoinClause(columnA1Symbol, columnB1Symbol));
})
.withSession(pushdownAutomaticSession);

if (pushdownExpected) {
ruleAssert.matches(tableScan(JOIN_PUSHDOWN_SCHEMA_TABLE_NAME.getTableName()));
}
else {
ruleAssert.doesNotFire();
}
}
}

@DataProvider
public static Object[][] testAutomaticJoinPushDownParams()
{
return new Object[][] {
{OptionalDouble.of(100), OptionalDouble.of(200), OptionalDouble.of(133), true},
{OptionalDouble.of(100), OptionalDouble.of(200), OptionalDouble.of(134), false}, // just above output size boundary
{OptionalDouble.empty(), OptionalDouble.of(200), OptionalDouble.of(250), false},
{OptionalDouble.of(100), OptionalDouble.empty(), OptionalDouble.of(250), false},
{OptionalDouble.of(100), OptionalDouble.of(200), OptionalDouble.empty(), false},
{OptionalDouble.of(100), OptionalDouble.of(200), OptionalDouble.of(301), false}
};
}

@Test(dataProvider = "testJoinPushdownStatsIrrelevantIfPushdownForcedParams")
public void testJoinPushdownStatsIrrelevantIfPushdownForced(OptionalDouble leftRows, OptionalDouble righRows, OptionalDouble joinRows)
{
try (RuleTester ruleTester = defaultRuleTester()) {
MockConnectorFactory connectorFactory = createMockConnectorFactory((session, applyJoinType, left, right, joinConditions, leftAssignments, rightAssignments) -> {
assertThat(((MockConnectorTableHandle) left).getTableName()).isEqualTo(TABLE_A_SCHEMA_TABLE_NAME);
assertThat(((MockConnectorTableHandle) right).getTableName()).isEqualTo(TABLE_B_SCHEMA_TABLE_NAME);
Assertions.assertThat(applyJoinType).isEqualTo(toSpiJoinType(JoinNode.Type.INNER));
Assertions.assertThat(joinConditions).containsExactly(new JoinCondition(JoinCondition.Operator.EQUAL, COLUMN_A1_VARIABLE, COLUMN_B1_VARIABLE));

return Optional.of(new JoinApplicationResult<>(
JOIN_CONNECTOR_TABLE_HANDLE,
JOIN_TABLE_A_COLUMN_MAPPING,
JOIN_TABLE_B_COLUMN_MAPPING));
});

ruleTester.getQueryRunner().createCatalog(MOCK_CATALOG, connectorFactory, ImmutableMap.of());

ruleTester.assertThat(new PushJoinIntoTableScan(ruleTester.getMetadata()))
.overrideStats("left", new PlanNodeStatsEstimate(leftRows.orElse(Double.NaN), ImmutableMap.of()))
.overrideStats("right", new PlanNodeStatsEstimate(righRows.orElse(Double.NaN), ImmutableMap.of()))
.overrideStats("join", new PlanNodeStatsEstimate(joinRows.orElse(Double.NaN), ImmutableMap.of()))
.on(p -> {
Symbol columnA1Symbol = p.symbol(COLUMN_A1);
Symbol columnA2Symbol = p.symbol(COLUMN_A2);
Symbol columnB1Symbol = p.symbol(COLUMN_B1);
TableScanNode left = new TableScanNode(
new PlanNodeId("left"),
TABLE_A_HANDLE,
ImmutableList.of(columnA1Symbol, columnA2Symbol),
ImmutableMap.of(
columnA1Symbol, COLUMN_A1_HANDLE,
columnA2Symbol, COLUMN_A2_HANDLE),
TupleDomain.all(),
false);

TableScanNode right = new TableScanNode(
new PlanNodeId("right"),
TABLE_B_HANDLE,
ImmutableList.of(columnB1Symbol),
ImmutableMap.of(columnB1Symbol, COLUMN_B1_HANDLE),
TupleDomain.all(),
false);

return join(new PlanNodeId("join"), JoinNode.Type.INNER, left, right, new JoinNode.EquiJoinClause(columnA1Symbol, columnB1Symbol));
})
.withSession(MOCK_SESSION)
.matches(tableScan(JOIN_PUSHDOWN_SCHEMA_TABLE_NAME.getTableName()));
}
}

@DataProvider
public static Object[][] testJoinPushdownStatsIrrelevantIfPushdownForcedParams()
{
return new Object[][] {
{OptionalDouble.of(100), OptionalDouble.of(200), OptionalDouble.of(133)},
{OptionalDouble.of(100), OptionalDouble.of(200), OptionalDouble.of(134)},
{OptionalDouble.empty(), OptionalDouble.of(200), OptionalDouble.of(250)},
{OptionalDouble.of(100), OptionalDouble.empty(), OptionalDouble.of(250)},
{OptionalDouble.of(100), OptionalDouble.of(200), OptionalDouble.empty()},
{OptionalDouble.of(100), OptionalDouble.of(200), OptionalDouble.of(301)}
};
}

private JoinNode join(PlanNodeId planNodeId, JoinNode.Type joinType, TableScanNode left, TableScanNode right, JoinNode.EquiJoinClause... criteria)
{
return new JoinNode(
planNodeId,
joinType,
left,
right,
ImmutableList.copyOf(criteria),
left.getOutputSymbols(),
right.getOutputSymbols(),
false,
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
ImmutableMap.of(),
Optional.empty());
}

private static TableHandle createTableHandle(ConnectorTableHandle tableHandle)
{
return createTableHandle(tableHandle, MOCK_CATALOG);
Expand Down