Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,10 @@ func isDistinct(op ops.Operator) bool {
}

func tryPushDownUnion(ctx *plancontext.PlanningContext, op *Union) (ops.Operator, *rewrite.ApplyResult, error) {
if res := compactUnion(op); res != rewrite.SameTree {
return op, res, nil
}

var sources []ops.Operator
var selects []sqlparser.SelectExprs
var err error
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/join_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, jo

// sharded routing is complex, so we handle it in a separate method
case a == sharded && b == sharded:
return tryMergeShardedRouting(ctx, lhsRoute, rhsRoute, m, joinPredicates)
return tryMergeJoinShardedRouting(ctx, lhsRoute, rhsRoute, m, joinPredicates)

default:
return nil, nil
Expand Down
8 changes: 7 additions & 1 deletion go/vt/vtgate/planbuilder/operators/sharded_routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,13 @@ func (tr *ShardedRouting) VindexExpressions() []sqlparser.Expr {
return tr.Selected.ValueExprs
}

func tryMergeShardedRouting(ctx *plancontext.PlanningContext, routeA *Route, routeB *Route, m merger, joinPredicates []sqlparser.Expr) (ops.Operator, error) {
func tryMergeJoinShardedRouting(
ctx *plancontext.PlanningContext,
routeA *Route,
routeB *Route,
Comment on lines +599 to +600
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can reuse types here

m merger,
joinPredicates []sqlparser.Expr,
) (ops.Operator, error) {
sameKeyspace := routeA.Routing.Keyspace() == routeB.Routing.Keyspace()
tblA := routeA.Routing.(*ShardedRouting)
tblB := routeB.Routing.(*ShardedRouting)
Expand Down
40 changes: 0 additions & 40 deletions go/vt/vtgate/planbuilder/operators/union.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

Expand Down Expand Up @@ -186,45 +185,6 @@ func (u *Union) GetSelectFor(source int) (*sqlparser.Select, error) {
}
}

func (u *Union) Compact(*plancontext.PlanningContext) (ops.Operator, *rewrite.ApplyResult, error) {
if u.distinct {
// first we remove unnecessary DISTINCTs
for idx, source := range u.Sources {
d, ok := source.(*Distinct)
if !ok || !d.Required {
continue
}
u.Sources[idx] = d.Source
}
}

var newSources []ops.Operator
var newSelects []sqlparser.SelectExprs
merged := false

for idx, source := range u.Sources {
other, ok := source.(*Union)

if ok && (u.distinct || !other.distinct) {
newSources = append(newSources, other.Sources...)
newSelects = append(newSelects, other.Selects...)
merged = true
continue
}

newSources = append(newSources, source)
newSelects = append(newSelects, u.Selects[idx])
}

if !merged {
return u, rewrite.SameTree, nil
}

u.Sources = newSources
u.Selects = newSelects
return u, rewrite.NewTree("merged UNIONs", u), nil
}

func (u *Union) AddColumns(ctx *plancontext.PlanningContext, reuse bool, addToGroupBy []bool, exprs []*sqlparser.AliasedExpr) ([]int, error) {
offsets := make([]int, len(exprs))
cols, err := u.GetColumns(ctx)
Expand Down
98 changes: 84 additions & 14 deletions go/vt/vtgate/planbuilder/operators/union_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

Expand Down Expand Up @@ -108,40 +109,70 @@ func mergeUnionInputsInOrder(ctx *plancontext.PlanningContext, op *Union) ([]ops
// If they can be merged, a new operator with the merged routing is returned
// If they cannot be merged, nil is returned.
// this function is very similar to mergeJoinInputs
func mergeUnionInputs(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, lhsExprs, rhsExprs sqlparser.SelectExprs, distinct bool) (ops.Operator, sqlparser.SelectExprs, error) {
func mergeUnionInputs(
ctx *plancontext.PlanningContext,
lhs, rhs ops.Operator,
lhsExprs, rhsExprs sqlparser.SelectExprs,
distinct bool,
) (ops.Operator, sqlparser.SelectExprs, error) {
lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs)
if lhsRoute == nil {
return nil, nil, nil
}

switch {
// if either side is a dual query, we can always merge them together
case b == dual:
// an unsharded/reference route can be merged with anything going to that keyspace
case b == dual || (b == anyShard && sameKeyspace):
return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingA)
case a == dual:
case a == dual || (a == anyShard && sameKeyspace):
Comment on lines +126 to +128
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: parenthesis around the && block are not needed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the parenthesis improves readability

return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingB)
case a == anyShard && sameKeyspace:

case a == none:
return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingB)
case b == anyShard && sameKeyspace:
case b == none:
return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingA)

case a == sharded && b == sharded && sameKeyspace:
// If the two routes fully match, they can be merged together.
tblA := routingA.(*ShardedRouting)
tblB := routingB.(*ShardedRouting)
if tblA.RouteOpCode == engine.Scatter && tblB.RouteOpCode == engine.Scatter {
return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingB)
}
if tblA.RouteOpCode != engine.EqualUnique || tblB.RouteOpCode != engine.EqualUnique {
break
res, exprs, err := tryMergeUnionShardedRouting(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct)
if err != nil || res != nil {
return res, exprs, err
}
}
return nil, nil, nil
}

func tryMergeUnionShardedRouting(
ctx *plancontext.PlanningContext,
routeA, routeB *Route,
exprsA, exprsB sqlparser.SelectExprs,
distinct bool,
) (ops.Operator, sqlparser.SelectExprs, error) {
tblA := routeA.Routing.(*ShardedRouting)
tblB := routeB.Routing.(*ShardedRouting)

scatterA := tblA.RouteOpCode == engine.Scatter
scatterB := tblB.RouteOpCode == engine.Scatter
uniqueA := tblA.RouteOpCode == engine.EqualUnique
uniqueB := tblB.RouteOpCode == engine.EqualUnique

switch {
case scatterA:
return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblA)

case scatterB:
return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblB)

case uniqueA && uniqueB:
aVdx := tblA.SelectedVindex()
bVdx := tblB.SelectedVindex()
aExpr := tblA.VindexExpressions()
bExpr := tblB.VindexExpressions()
if aVdx == bVdx && gen4ValuesEqual(ctx, aExpr, bExpr) {
return createMergedUnion(ctx, lhsRoute, rhsRoute, lhsExprs, rhsExprs, distinct, routingB)
return createMergedUnion(ctx, routeA, routeB, exprsA, exprsB, distinct, tblA)
}
}

return nil, nil, nil
}

Expand Down Expand Up @@ -187,3 +218,42 @@ func createMergedUnion(
Routing: routing,
}, selectExprs, nil
}

func compactUnion(u *Union) *rewrite.ApplyResult {
if u.distinct {
// first we remove unnecessary DISTINCTs
for idx, source := range u.Sources {
d, ok := source.(*Distinct)
if !ok || !d.Required {
continue
}
u.Sources[idx] = d.Source
}
}

var newSources []ops.Operator
var newSelects []sqlparser.SelectExprs
merged := false

for idx, source := range u.Sources {
other, ok := source.(*Union)

if ok && (u.distinct || !other.distinct) {
newSources = append(newSources, other.Sources...)
newSelects = append(newSelects, other.Selects...)
merged = true
continue
}

newSources = append(newSources, source)
newSelects = append(newSelects, u.Selects[idx])
}

if !merged {
return rewrite.SameTree
}

u.Sources = newSources
u.Selects = newSelects
return rewrite.NewTree("merged UNIONs", u)
}
38 changes: 4 additions & 34 deletions go/vt/vtgate/planbuilder/testdata/large_union_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
"Name": "user",
"Sharded": true
},
"FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from music where 1 != 1",
"Query": "select distinct content, user_id, weight_string(content), weight_string(user_id) from music where user_id = 1270698330 order by created_at asc, id asc limit 11",
"FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from ((select content, user_id from music where 1 != 1) union (select content, user_id from music where 1 != 1)) as dt where 1 != 1",
"Query": "select content, user_id, weight_string(content), weight_string(user_id) from ((select content, user_id from music where user_id = 1270698330 order by created_at asc, id asc limit 11) union (select content, user_id from music where user_id = 1270698330 order by created_at asc, id asc limit 11)) as dt",
"Table": "music",
"Values": [
"INT64(1270698330)"
Expand All @@ -38,8 +38,8 @@
"Name": "user",
"Sharded": true
},
"FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from music where 1 != 1",
"Query": "select distinct content, user_id, weight_string(content), weight_string(user_id) from music where user_id = 1270699497 order by created_at asc, id asc limit 11",
"FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from ((select content, user_id from music where 1 != 1) union (select content, user_id from music where 1 != 1)) as dt where 1 != 1",
"Query": "select content, user_id, weight_string(content), weight_string(user_id) from ((select content, user_id from music where user_id = 1270699497 order by created_at asc, id asc limit 11) union (select content, user_id from music where user_id = 1270699497 order by created_at asc, id asc limit 11)) as dt",
"Table": "music",
"Values": [
"INT64(1270699497)"
Expand Down Expand Up @@ -406,36 +406,6 @@
],
"Vindex": "user_index"
},
{
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from music where 1 != 1",
"Query": "select distinct content, user_id, weight_string(content), weight_string(user_id) from music where user_id = 1270698330 order by created_at asc, id asc limit 11",
"Table": "music",
"Values": [
"INT64(1270698330)"
],
"Vindex": "user_index"
},
{
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select content, user_id, weight_string(content), weight_string(user_id) from music where 1 != 1",
"Query": "select distinct content, user_id, weight_string(content), weight_string(user_id) from music where user_id = 1270699497 order by created_at asc, id asc limit 11",
"Table": "music",
"Values": [
"INT64(1270699497)"
],
"Vindex": "user_index"
},
{
"OperatorType": "Route",
"Variant": "EqualUnique",
Expand Down
70 changes: 14 additions & 56 deletions go/vt/vtgate/planbuilder/testdata/union_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -298,35 +298,15 @@
"QueryType": "SELECT",
"Original": "select id from user where id = 1 union select id from user where id = 1 union all select id from user",
"Instructions": {
"OperatorType": "Concatenate",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select id from `user` where 1 != 1 union select id from `user` where 1 != 1",
"Query": "select id from `user` where id = 1 union select id from `user` where id = 1",
"Table": "`user`",
"Values": [
"INT64(1)"
],
"Vindex": "user_index"
},
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select id from `user` where 1 != 1",
"Query": "select id from `user`",
"Table": "`user`"
}
]
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select id from `user` where 1 != 1 union select id from `user` where 1 != 1 union all select id from `user` where 1 != 1",
"Query": "select id from `user` where id = 1 union select id from `user` where id = 1 union all select id from `user`",
"Table": "`user`"
},
"TablesUsed": [
"user.user"
Expand Down Expand Up @@ -926,30 +906,8 @@
"Name": "main",
"Sharded": false
},
"FieldQuery": "select col, weight_string(col) from unsharded where 1 != 1",
"Query": "select distinct col, weight_string(col) from unsharded",
"Table": "unsharded"
},
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select id, weight_string(id) from `user` where 1 != 1",
"Query": "select distinct id, weight_string(id) from `user`",
"Table": "`user`"
},
{
"OperatorType": "Route",
"Variant": "Unsharded",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select col2, weight_string(col2) from unsharded where 1 != 1",
"Query": "select distinct col2, weight_string(col2) from unsharded",
"FieldQuery": "select col, weight_string(col) from (select col from unsharded where 1 != 1 union select col2 from unsharded where 1 != 1) as dt where 1 != 1",
"Query": "select col, weight_string(col) from (select col from unsharded union select col2 from unsharded) as dt",
"Table": "unsharded"
},
{
Expand All @@ -959,9 +917,9 @@
"Name": "user",
"Sharded": true
},
"FieldQuery": "select col, weight_string(col) from user_extra where 1 != 1",
"Query": "select distinct col, weight_string(col) from user_extra",
"Table": "user_extra"
"FieldQuery": "select id, weight_string(id) from (select id from `user` where 1 != 1 union select col from user_extra where 1 != 1) as dt where 1 != 1",
"Query": "select id, weight_string(id) from (select id from `user` union select col from user_extra) as dt",
"Table": "`user`, user_extra"
}
]
}
Expand Down