From 9ff79c569f017bb5ba19d5b2c706b09921ea7d3b Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 8 Aug 2023 10:45:34 +0200 Subject: [PATCH 1/2] more union merging Signed-off-by: Andres Taylor --- .../planbuilder/operators/horizon_planning.go | 4 + .../planbuilder/operators/join_merging.go | 2 +- .../planbuilder/operators/sharded_routing.go | 8 +- go/vt/vtgate/planbuilder/operators/union.go | 40 ------- .../planbuilder/operators/union_merging.go | 109 +++++++++++++++--- .../testdata/large_union_cases.json | 38 +----- .../planbuilder/testdata/union_cases.json | 32 +---- 7 files changed, 116 insertions(+), 117 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/horizon_planning.go b/go/vt/vtgate/planbuilder/operators/horizon_planning.go index 0fcd501c377..4dfb185f07e 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_planning.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_planning.go @@ -654,6 +654,10 @@ func isDistinct(op ops.Operator) bool { } func tryPushDownUnion(ctx *plancontext.PlanningContext, op *Union) (ops.Operator, *rewrite.ApplyResult, error) { + if res := compactUnion(op); res != rewrite.SameTree { + return op, res, nil + } + var sources []ops.Operator var selects []sqlparser.SelectExprs var err error diff --git a/go/vt/vtgate/planbuilder/operators/join_merging.go b/go/vt/vtgate/planbuilder/operators/join_merging.go index 4c4b1c815b4..b39949e2d2a 100644 --- a/go/vt/vtgate/planbuilder/operators/join_merging.go +++ b/go/vt/vtgate/planbuilder/operators/join_merging.go @@ -59,7 +59,7 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo // sharded routing is complex, so we handle it in a separate method case a == sharded && b == sharded: - return tryMergeShardedRouting(ctx, lhsRoute, rhsRoute, m, joinPredicates) + return tryMergeJoinShardedRouting(ctx, lhsRoute, rhsRoute, m, joinPredicates) default: return nil, nil diff --git a/go/vt/vtgate/planbuilder/operators/sharded_routing.go b/go/vt/vtgate/planbuilder/operators/sharded_routing.go index eb55c34b9bd..f7ae80accd5 100644 --- a/go/vt/vtgate/planbuilder/operators/sharded_routing.go +++ b/go/vt/vtgate/planbuilder/operators/sharded_routing.go @@ -594,7 +594,13 @@ func (tr *ShardedRouting) VindexExpressions() []sqlparser.Expr { return tr.Selected.ValueExprs } -func tryMergeShardedRouting(ctx *plancontext.PlanningContext, routeA *Route, routeB *Route, m merger, joinPredicates []sqlparser.Expr) (ops.Operator, error) { +func tryMergeJoinShardedRouting( + ctx *plancontext.PlanningContext, + routeA *Route, + routeB *Route, + m merger, + joinPredicates []sqlparser.Expr, +) (ops.Operator, error) { sameKeyspace := routeA.Routing.Keyspace() == routeB.Routing.Keyspace() tblA := routeA.Routing.(*ShardedRouting) tblB := routeB.Routing.(*ShardedRouting) diff --git a/go/vt/vtgate/planbuilder/operators/union.go b/go/vt/vtgate/planbuilder/operators/union.go index 72a9985cdca..7a8ae08e219 100644 --- a/go/vt/vtgate/planbuilder/operators/union.go +++ b/go/vt/vtgate/planbuilder/operators/union.go @@ -25,7 +25,6 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" - "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" ) @@ -186,45 +185,6 @@ func (u *Union) GetSelectFor(source int) (*sqlparser.Select, error) { } } -func (u *Union) Compact(*plancontext.PlanningContext) (ops.Operator, *rewrite.ApplyResult, error) { - if u.distinct { - // first we remove unnecessary DISTINCTs - for idx, source := range u.Sources { - d, ok := source.(*Distinct) - if !ok || !d.Required { - continue - } - u.Sources[idx] = d.Source - } - } - - var newSources []ops.Operator - var newSelects []sqlparser.SelectExprs - merged := false - - for idx, source := range u.Sources { - other, ok := source.(*Union) - - if ok && (u.distinct || !other.distinct) { - newSources = append(newSources, other.Sources...) - newSelects = append(newSelects, other.Selects...) - merged = true - continue - } - - newSources = append(newSources, source) - newSelects = append(newSelects, u.Selects[idx]) - } - - if !merged { - return u, rewrite.SameTree, nil - } - - u.Sources = newSources - u.Selects = newSelects - return u, rewrite.NewTree("merged UNIONs", u), nil -} - func (u *Union) AddColumns(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) ([]int, error) { offsets := make([]int, len(exprs)) cols, err := u.GetColumns(ctx) diff --git a/go/vt/vtgate/planbuilder/operators/union_merging.go b/go/vt/vtgate/planbuilder/operators/union_merging.go index 78cc77b03e5..bc3bd448eeb 100644 --- a/go/vt/vtgate/planbuilder/operators/union_merging.go +++ b/go/vt/vtgate/planbuilder/operators/union_merging.go @@ -20,6 +20,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" ) @@ -108,7 +109,12 @@ func mergeUnionInputsInOrder(ctx *plancontext.PlanningContext, op *Union) ([]ops // If they can be merged, a new operator with the merged routing is returned // If they cannot be merged, nil is returned. // this function is very similar to mergeJoinInputs -func mergeUnionInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, lhsExprs, rhsExprs sqlparser.SelectExprs, distinct bool) (ops.Operator, sqlparser.SelectExprs, error) { +func mergeUnionInputs( + ctx *plancontext.PlanningContext, + lhs, rhs ops.Operator, + lhsExprs, rhsExprs sqlparser.SelectExprs, + distinct bool, +) (ops.Operator, sqlparser.SelectExprs, error) { lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs) if lhsRoute == nil { return nil, nil, nil @@ -116,32 +122,68 @@ func mergeUnionInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, l switch { // if either side is a dual query, we can always merge them together - case b == dual: + // an unsharded/reference route can be merged with anything going to that keyspace + case b == dual || (b == anyShard && sameKeyspace): return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingA) - case a == dual: + case a == dual || (a == anyShard && sameKeyspace): return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingB) - case a == anyShard && sameKeyspace: + + case a == none: return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingB) - case b == anyShard && sameKeyspace: + case b == none: return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingA) + case a == sharded && b == sharded && sameKeyspace: - // If the two routes fully match, they can be merged together. - tblA := routingA.(*ShardedRouting) - tblB := routingB.(*ShardedRouting) - if tblA.RouteOpCode == engine.Scatter && tblB.RouteOpCode == engine.Scatter { - return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingB) - } - if tblA.RouteOpCode != engine.EqualUnique || tblB.RouteOpCode != engine.EqualUnique { - break + res, exprs, err := tryMergeUnionShardedRouting(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct) + if err != nil || res != nil { + return res, exprs, err } + } + return nil, nil, nil +} + +func tryMergeUnionShardedRouting( + ctx *plancontext.PlanningContext, + routeA, routeB *Route, + exprsA, exprsB sqlparser.SelectExprs, + distinct bool, +) (ops.Operator, sqlparser.SelectExprs, error) { + tblA := routeA.Routing.(*ShardedRouting) + tblB := routeB.Routing.(*ShardedRouting) + + scatterA := tblA.RouteOpCode == engine.Scatter + scatterB := tblB.RouteOpCode == engine.Scatter + uniqueA := tblA.RouteOpCode == engine.EqualUnique + uniqueB := tblB.RouteOpCode == engine.EqualUnique + + switch { + case scatterA && scatterB: + return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblA) + + case uniqueA && uniqueB: aVdx := tblA.SelectedVindex() bVdx := tblB.SelectedVindex() aExpr := tblA.VindexExpressions() bExpr := tblB.VindexExpressions() if aVdx == bVdx && gen4ValuesEqual(ctx, aExpr, bExpr) { - return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingB) + return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblA) } + } + + switch tblA.RouteOpCode { + case engine.EqualUnique: + if tblB.RouteOpCode == engine.EqualUnique { + aVdx := tblA.SelectedVindex() + bVdx := tblB.SelectedVindex() + aExpr := tblA.VindexExpressions() + bExpr := tblB.VindexExpressions() + if aVdx == bVdx && gen4ValuesEqual(ctx, aExpr, bExpr) { + return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblA) + } + } + } + return nil, nil, nil } @@ -187,3 +229,42 @@ func createMergedUnion( Routing: routing, }, selectExprs, nil } + +func compactUnion(u *Union) *rewrite.ApplyResult { + if u.distinct { + // first we remove unnecessary DISTINCTs + for idx, source := range u.Sources { + d, ok := source.(*Distinct) + if !ok || !d.Required { + continue + } + u.Sources[idx] = d.Source + } + } + + var newSources []ops.Operator + var newSelects []sqlparser.SelectExprs + merged := false + + for idx, source := range u.Sources { + other, ok := source.(*Union) + + if ok && (u.distinct || !other.distinct) { + newSources = append(newSources, other.Sources...) + newSelects = append(newSelects, other.Selects...) + merged = true + continue + } + + newSources = append(newSources, source) + newSelects = append(newSelects, u.Selects[idx]) + } + + if !merged { + return rewrite.SameTree + } + + u.Sources = newSources + u.Selects = newSelects + return rewrite.NewTree("merged UNIONs", u) +} diff --git a/go/vt/vtgate/planbuilder/testdata/large_union_cases.json b/go/vt/vtgate/planbuilder/testdata/large_union_cases.json index bed4a521b82..1ad7b33d589 100644 --- a/go/vt/vtgate/planbuilder/testdata/large_union_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/large_union_cases.json @@ -23,8 +23,8 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from music where 1 != 1", - "Query": "select distinct content, user_id, weight_string(content), weight_string(user_id) from music where user_id = 1270698330 order by created_at asc, id asc limit 11", + "FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from ((select content, user_id from music where 1 != 1) union (select content, user_id from music where 1 != 1)) as dt where 1 != 1", + "Query": "select content, user_id, weight_string(content), weight_string(user_id) from ((select content, user_id from music where user_id = 1270698330 order by created_at asc, id asc limit 11) union (select content, user_id from music where user_id = 1270698330 order by created_at asc, id asc limit 11)) as dt", "Table": "music", "Values": [ "INT64(1270698330)" @@ -38,8 +38,8 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from music where 1 != 1", - "Query": "select distinct content, user_id, weight_string(content), weight_string(user_id) from music where user_id = 1270699497 order by created_at asc, id asc limit 11", + "FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from ((select content, user_id from music where 1 != 1) union (select content, user_id from music where 1 != 1)) as dt where 1 != 1", + "Query": "select content, user_id, weight_string(content), weight_string(user_id) from ((select content, user_id from music where user_id = 1270699497 order by created_at asc, id asc limit 11) union (select content, user_id from music where user_id = 1270699497 order by created_at asc, id asc limit 11)) as dt", "Table": "music", "Values": [ "INT64(1270699497)" @@ -406,36 +406,6 @@ ], "Vindex": "user_index" }, - { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from music where 1 != 1", - "Query": "select distinct content, user_id, weight_string(content), weight_string(user_id) from music where user_id = 1270698330 order by created_at asc, id asc limit 11", - "Table": "music", - "Values": [ - "INT64(1270698330)" - ], - "Vindex": "user_index" - }, - { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from music where 1 != 1", - "Query": "select distinct content, user_id, weight_string(content), weight_string(user_id) from music where user_id = 1270699497 order by created_at asc, id asc limit 11", - "Table": "music", - "Values": [ - "INT64(1270699497)" - ], - "Vindex": "user_index" - }, { "OperatorType": "Route", "Variant": "EqualUnique", diff --git a/go/vt/vtgate/planbuilder/testdata/union_cases.json b/go/vt/vtgate/planbuilder/testdata/union_cases.json index b718065b9b5..a0f9ae64214 100644 --- a/go/vt/vtgate/planbuilder/testdata/union_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/union_cases.json @@ -926,8 +926,8 @@ "Name": "main", "Sharded": false }, - "FieldQuery": "select col, weight_string(col) from unsharded where 1 != 1", - "Query": "select distinct col, weight_string(col) from unsharded", + "FieldQuery": "select col, weight_string(col) from (select col from unsharded where 1 != 1 union select col2 from unsharded where 1 != 1) as dt where 1 != 1", + "Query": "select col, weight_string(col) from (select col from unsharded union select col2 from unsharded) as dt", "Table": "unsharded" }, { @@ -937,31 +937,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select id, weight_string(id) from `user` where 1 != 1", - "Query": "select distinct id, weight_string(id) from `user`", - "Table": "`user`" - }, - { - "OperatorType": "Route", - "Variant": "Unsharded", - "Keyspace": { - "Name": "main", - "Sharded": false - }, - "FieldQuery": "select col2, weight_string(col2) from unsharded where 1 != 1", - "Query": "select distinct col2, weight_string(col2) from unsharded", - "Table": "unsharded" - }, - { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select col, weight_string(col) from user_extra where 1 != 1", - "Query": "select distinct col, weight_string(col) from user_extra", - "Table": "user_extra" + "FieldQuery": "select id, weight_string(id) from (select id from `user` where 1 != 1 union select col from user_extra where 1 != 1) as dt where 1 != 1", + "Query": "select id, weight_string(id) from (select id from `user` union select col from user_extra) as dt", + "Table": "`user`, user_extra" } ] } From 1e642ae123a65ded7b39c641f1232ddc7affd7a3 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 8 Aug 2023 15:17:49 +0200 Subject: [PATCH 2/2] more union merging Signed-off-by: Andres Taylor --- .../planbuilder/operators/union_merging.go | 19 ++-------- .../planbuilder/testdata/union_cases.json | 38 +++++-------------- 2 files changed, 13 insertions(+), 44 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/union_merging.go b/go/vt/vtgate/planbuilder/operators/union_merging.go index bc3bd448eeb..4c8b02f76d8 100644 --- a/go/vt/vtgate/planbuilder/operators/union_merging.go +++ b/go/vt/vtgate/planbuilder/operators/union_merging.go @@ -157,9 +157,12 @@ func tryMergeUnionShardedRouting( uniqueB := tblB.RouteOpCode == engine.EqualUnique switch { - case scatterA && scatterB: + case scatterA: return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblA) + case scatterB: + return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblB) + case uniqueA && uniqueB: aVdx := tblA.SelectedVindex() bVdx := tblB.SelectedVindex() @@ -168,20 +171,6 @@ func tryMergeUnionShardedRouting( if aVdx == bVdx && gen4ValuesEqual(ctx, aExpr, bExpr) { return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblA) } - - } - - switch tblA.RouteOpCode { - case engine.EqualUnique: - if tblB.RouteOpCode == engine.EqualUnique { - aVdx := tblA.SelectedVindex() - bVdx := tblB.SelectedVindex() - aExpr := tblA.VindexExpressions() - bExpr := tblB.VindexExpressions() - if aVdx == bVdx && gen4ValuesEqual(ctx, aExpr, bExpr) { - return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblA) - } - } } return nil, nil, nil diff --git a/go/vt/vtgate/planbuilder/testdata/union_cases.json b/go/vt/vtgate/planbuilder/testdata/union_cases.json index a0f9ae64214..e6f84dd1631 100644 --- a/go/vt/vtgate/planbuilder/testdata/union_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/union_cases.json @@ -298,35 +298,15 @@ "QueryType": "SELECT", "Original": "select id from user where id = 1 union select id from user where id = 1 union all select id from user", "Instructions": { - "OperatorType": "Concatenate", - "Inputs": [ - { - "OperatorType": "Route", - "Variant": "EqualUnique", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select id from `user` where 1 != 1 union select id from `user` where 1 != 1", - "Query": "select id from `user` where id = 1 union select id from `user` where id = 1", - "Table": "`user`", - "Values": [ - "INT64(1)" - ], - "Vindex": "user_index" - }, - { - "OperatorType": "Route", - "Variant": "Scatter", - "Keyspace": { - "Name": "user", - "Sharded": true - }, - "FieldQuery": "select id from `user` where 1 != 1", - "Query": "select id from `user`", - "Table": "`user`" - } - ] + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select id from `user` where 1 != 1 union select id from `user` where 1 != 1 union all select id from `user` where 1 != 1", + "Query": "select id from `user` where id = 1 union select id from `user` where id = 1 union all select id from `user`", + "Table": "`user`" }, "TablesUsed": [ "user.user"