diff --git a/go/vt/vtgate/planbuilder/operator_transformers.go b/go/vt/vtgate/planbuilder/operator_transformers.go index e2a7967a1b6..9e12cb84948 100644 --- a/go/vt/vtgate/planbuilder/operator_transformers.go +++ b/go/vt/vtgate/planbuilder/operator_transformers.go @@ -21,6 +21,10 @@ import ( "strconv" "strings" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" + + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" @@ -39,7 +43,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) -func transformToLogicalPlan(ctx *plancontext.PlanningContext, op operators.Operator, isRoot bool) (logicalPlan, error) { +func transformToLogicalPlan(ctx *plancontext.PlanningContext, op ops.Operator, isRoot bool) (logicalPlan, error) { switch op := op.(type) { case *operators.Route: return transformRoutePlan(ctx, op) @@ -167,7 +171,10 @@ func transformRoutePlan(ctx *plancontext.PlanningContext, op *operators.Route) ( values = op.Selected.Values } condition := getVindexPredicate(ctx, op) - sel := toSQL(ctx, op.Source) + sel, err := operators.ToSQL(ctx, op.Source) + if err != nil { + return nil, err + } replaceSubQuery(ctx, sel) return &routeGen4{ eroute: &engine.Route{ @@ -323,7 +330,7 @@ func getVindexPredicate(ctx *plancontext.PlanningContext, op *operators.Route) s func getAllTableNames(op *operators.Route) ([]string, error) { tableNameMap := map[string]any{} - err := operators.VisitTopDown(op, func(op operators.Operator) error { + err := rewrite.Visit(op, func(op ops.Operator) error { tbl, isTbl := op.(*operators.Table) var name string if isTbl { diff --git a/go/vt/vtgate/planbuilder/operator_to_query.go b/go/vt/vtgate/planbuilder/operators/SQL_builder.go similarity index 69% rename from go/vt/vtgate/planbuilder/operator_to_query.go rename to go/vt/vtgate/planbuilder/operators/SQL_builder.go index 440cd7d9beb..8cbc688a157 100644 --- a/go/vt/vtgate/planbuilder/operator_to_query.go +++ b/go/vt/vtgate/planbuilder/operators/SQL_builder.go @@ -1,5 +1,5 @@ /* -Copyright 2021 The Vitess Authors. +Copyright 2022 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,106 +14,38 @@ See the License for the specific language governing permissions and limitations under the License. */ -package planbuilder +package operators import ( - "fmt" "sort" "strings" - "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/log" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" - "vitess.io/vitess/go/vt/vtgate/semantics" - - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vtgate/planbuilder/operators" ) -type queryBuilder struct { - ctx *plancontext.PlanningContext - sel sqlparser.SelectStatement - tableNames []string -} +type ( + queryBuilder struct { + ctx *plancontext.PlanningContext + sel sqlparser.SelectStatement + tableNames []string + } +) -func toSQL(ctx *plancontext.PlanningContext, op operators.Operator) sqlparser.SelectStatement { +func ToSQL(ctx *plancontext.PlanningContext, op ops.Operator) (sqlparser.SelectStatement, error) { q := &queryBuilder{ctx: ctx} - buildQuery(op, q) - q.sortTables() - return q.sel -} - -func buildQuery(op operators.Operator, qb *queryBuilder) { - switch op := op.(type) { - case *operators.Table: - dbName := "" - - if op.QTable.IsInfSchema { - dbName = op.QTable.Table.Qualifier.String() - } - qb.addTable(dbName, op.QTable.Table.Name.String(), op.QTable.Alias.As.String(), operators.TableID(op), op.QTable.Alias.Hints) - for _, pred := range op.QTable.Predicates { - qb.addPredicate(pred) - } - for _, name := range op.Columns { - qb.addProjection(&sqlparser.AliasedExpr{Expr: name}) - } - case *operators.ApplyJoin: - buildQuery(op.LHS, qb) - // If we are going to add the predicate used in join here - // We should not add the predicate's copy of when it was split into - // two parts. To avoid this, we use the SkipPredicates map. - for _, expr := range qb.ctx.JoinPredicates[op.Predicate] { - qb.ctx.SkipPredicates[expr] = nil - } - qbR := &queryBuilder{ctx: qb.ctx} - buildQuery(op.RHS, qbR) - if op.LeftJoin { - qb.joinOuterWith(qbR, op.Predicate) - } else { - qb.joinInnerWith(qbR, op.Predicate) - } - case *operators.Filter: - buildQuery(op.Source, qb) - for _, pred := range op.Predicates { - qb.addPredicate(pred) - } - case *operators.Derived: - buildQuery(op.Source, qb) - sel := qb.sel.(*sqlparser.Select) // we can only handle SELECT in derived tables at the moment - qb.sel = nil - opQuery := sqlparser.RemoveKeyspace(op.Query).(*sqlparser.Select) - sel.Limit = opQuery.Limit - sel.OrderBy = opQuery.OrderBy - sel.GroupBy = opQuery.GroupBy - sel.Having = opQuery.Having - sel.SelectExprs = opQuery.SelectExprs - qb.addTableExpr(op.Alias, op.Alias, operators.TableID(op), &sqlparser.DerivedTable{ - Select: sel, - }, nil, op.ColumnAliases) - for _, col := range op.Columns { - qb.addProjection(&sqlparser.AliasedExpr{Expr: col}) - } - default: - panic(fmt.Sprintf("%T", op)) + err := buildQuery(op, q) + if err != nil { + return nil, err } -} - -func (qb *queryBuilder) sortTables() { - _ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { - sel, isSel := node.(*sqlparser.Select) - if !isSel { - return true, nil - } - ts := &tableSorter{ - sel: sel, - tbl: qb.ctx.SemTable, - } - sort.Sort(ts) - return true, nil - }, qb.sel) - + q.sortTables() + return q.sel, nil } func (qb *queryBuilder) addTable(db, tableName, alias string, tableID semantics.TableSet, hints sqlparser.IndexHints) { @@ -256,6 +188,22 @@ func (qb *queryBuilder) hasTable(tableName string) bool { return false } +func (qb *queryBuilder) sortTables() { + _ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) { + sel, isSel := node.(*sqlparser.Select) + if !isSel { + return true, nil + } + ts := &tableSorter{ + sel: sel, + tbl: qb.ctx.SemTable, + } + sort.Sort(ts) + return true, nil + }, qb.sel) + +} + type tableSorter struct { sel *sqlparser.Select tbl *semantics.SemTable @@ -286,3 +234,151 @@ func (ts *tableSorter) Less(i, j int) bool { func (ts *tableSorter) Swap(i, j int) { ts.sel.From[i], ts.sel.From[j] = ts.sel.From[j], ts.sel.From[i] } + +func (h *Horizon) toSQL(qb *queryBuilder) error { + err := stripDownQuery(h.Select, qb.sel) + if err != nil { + return err + } + sqlparser.Rewrite(qb.sel, func(cursor *sqlparser.Cursor) bool { + if aliasedExpr, ok := cursor.Node().(sqlparser.SelectExpr); ok { + removeKeyspaceFromSelectExpr(aliasedExpr) + } + return true + }, nil) + return nil +} + +func removeKeyspaceFromSelectExpr(expr sqlparser.SelectExpr) { + switch expr := expr.(type) { + case *sqlparser.AliasedExpr: + sqlparser.RemoveKeyspaceFromColName(expr.Expr) + case *sqlparser.StarExpr: + expr.TableName.Qualifier = sqlparser.NewIdentifierCS("") + } +} + +func stripDownQuery(from, to sqlparser.SelectStatement) error { + var err error + + switch node := from.(type) { + case *sqlparser.Select: + toNode, ok := to.(*sqlparser.Select) + if !ok { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "AST did not match") + } + toNode.Distinct = node.Distinct + toNode.GroupBy = node.GroupBy + toNode.Having = node.Having + toNode.OrderBy = node.OrderBy + toNode.Comments = node.Comments + toNode.SelectExprs = node.SelectExprs + for _, expr := range toNode.SelectExprs { + removeKeyspaceFromSelectExpr(expr) + } + case *sqlparser.Union: + toNode, ok := to.(*sqlparser.Union) + if !ok { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "AST did not match") + } + err = stripDownQuery(node.Left, toNode.Left) + if err != nil { + return err + } + err = stripDownQuery(node.Right, toNode.Right) + if err != nil { + return err + } + toNode.OrderBy = node.OrderBy + default: + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "BUG: this should not happen - we have covered all implementations of SelectStatement %T", from) + } + return nil +} + +func buildQuery(op ops.Operator, qb *queryBuilder) error { + switch op := op.(type) { + case *Table: + dbName := "" + + if op.QTable.IsInfSchema { + dbName = op.QTable.Table.Qualifier.String() + } + qb.addTable(dbName, op.QTable.Table.Name.String(), op.QTable.Alias.As.String(), TableID(op), op.QTable.Alias.Hints) + for _, pred := range op.QTable.Predicates { + qb.addPredicate(pred) + } + for _, name := range op.Columns { + qb.addProjection(&sqlparser.AliasedExpr{Expr: name}) + } + case *ApplyJoin: + err := buildQuery(op.LHS, qb) + if err != nil { + return err + } + // If we are going to add the predicate used in join here + // We should not add the predicate's copy of when it was split into + // two parts. To avoid this, we use the SkipPredicates map. + for _, expr := range qb.ctx.JoinPredicates[op.Predicate] { + qb.ctx.SkipPredicates[expr] = nil + } + qbR := &queryBuilder{ctx: qb.ctx} + err = buildQuery(op.RHS, qbR) + if err != nil { + return err + } + if op.LeftJoin { + qb.joinOuterWith(qbR, op.Predicate) + } else { + qb.joinInnerWith(qbR, op.Predicate) + } + case *Filter: + err := buildQuery(op.Source, qb) + if err != nil { + return err + } + for _, pred := range op.Predicates { + qb.addPredicate(pred) + } + case *Derived: + err := buildQuery(op.Source, qb) + if err != nil { + return err + } + sel := qb.sel.(*sqlparser.Select) // we can only handle SELECT in derived tables at the moment + qb.sel = nil + opQuery := sqlparser.RemoveKeyspace(op.Query).(*sqlparser.Select) + sel.Limit = opQuery.Limit + sel.OrderBy = opQuery.OrderBy + sel.GroupBy = opQuery.GroupBy + sel.Having = opQuery.Having + sel.SelectExprs = opQuery.SelectExprs + qb.addTableExpr(op.Alias, op.Alias, TableID(op), &sqlparser.DerivedTable{ + Select: sel, + }, nil, op.ColumnAliases) + for _, col := range op.Columns { + qb.addProjection(&sqlparser.AliasedExpr{Expr: col}) + } + case *Horizon: + err := buildQuery(op.Source, qb) + if err != nil { + return err + } + + err = stripDownQuery(op.Select, qb.sel) + if err != nil { + return err + } + sqlparser.Rewrite(qb.sel, func(cursor *sqlparser.Cursor) bool { + if aliasedExpr, ok := cursor.Node().(sqlparser.SelectExpr); ok { + removeKeyspaceFromSelectExpr(aliasedExpr) + } + return true + }, nil) + return nil + + default: + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "don't know how to turn %T into SQL", op) + } + return nil +} diff --git a/go/vt/vtgate/planbuilder/operators/apply_join.go b/go/vt/vtgate/planbuilder/operators/apply_join.go index 0c7f904d4f6..6c23ab94eb1 100644 --- a/go/vt/vtgate/planbuilder/operators/apply_join.go +++ b/go/vt/vtgate/planbuilder/operators/apply_join.go @@ -20,17 +20,18 @@ import ( "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" - "vitess.io/vitess/go/vt/vtgate/semantics" ) // ApplyJoin is a nested loop join - for each row on the LHS, // we'll execute the plan on the RHS, feeding data from left to right type ApplyJoin struct { - LHS, RHS Operator + LHS, RHS ops.Operator // Columns stores the column indexes of the columns coming from the left and right side // negative value comes from LHS and positive from RHS @@ -52,9 +53,9 @@ type ApplyJoin struct { Predicate sqlparser.Expr } -var _ PhysicalOperator = (*ApplyJoin)(nil) +var _ ops.PhysicalOperator = (*ApplyJoin)(nil) -func NewApplyJoin(lhs, rhs Operator, predicate sqlparser.Expr, leftOuterJoin bool) *ApplyJoin { +func NewApplyJoin(lhs, rhs ops.Operator, predicate sqlparser.Expr, leftOuterJoin bool) *ApplyJoin { return &ApplyJoin{ LHS: lhs, RHS: rhs, @@ -68,8 +69,7 @@ func NewApplyJoin(lhs, rhs Operator, predicate sqlparser.Expr, leftOuterJoin boo func (a *ApplyJoin) IPhysical() {} // Clone implements the Operator interface -func (a *ApplyJoin) clone(inputs []Operator) Operator { - checkSize(inputs, 2) +func (a *ApplyJoin) Clone(inputs []ops.Operator) ops.Operator { return &ApplyJoin{ LHS: inputs[0], RHS: inputs[1], @@ -82,46 +82,42 @@ func (a *ApplyJoin) clone(inputs []Operator) Operator { } } -func (a *ApplyJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { - return addPredicate(a, ctx, expr, false) +func (a *ApplyJoin) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { + return AddPredicate(a, ctx, expr, false, newFilter) } -// inputs implements the Operator interface -func (a *ApplyJoin) inputs() []Operator { - return []Operator{a.LHS, a.RHS} +// Inputs implements the Operator interface +func (a *ApplyJoin) Inputs() []ops.Operator { + return []ops.Operator{a.LHS, a.RHS} } -var _ joinOperator = (*ApplyJoin)(nil) - -func (a *ApplyJoin) tableID() semantics.TableSet { - return TableID(a) -} +var _ JoinOp = (*ApplyJoin)(nil) -func (a *ApplyJoin) getLHS() Operator { +func (a *ApplyJoin) GetLHS() ops.Operator { return a.LHS } -func (a *ApplyJoin) getRHS() Operator { +func (a *ApplyJoin) GetRHS() ops.Operator { return a.RHS } -func (a *ApplyJoin) setLHS(operator Operator) { +func (a *ApplyJoin) SetLHS(operator ops.Operator) { a.LHS = operator } -func (a *ApplyJoin) setRHS(operator Operator) { +func (a *ApplyJoin) SetRHS(operator ops.Operator) { a.RHS = operator } -func (a *ApplyJoin) makeInner() { +func (a *ApplyJoin) MakeInner() { a.LeftJoin = false } -func (a *ApplyJoin) isInner() bool { +func (a *ApplyJoin) IsInner() bool { return !a.LeftJoin } -func (a *ApplyJoin) addJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) error { +func (a *ApplyJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) error { bvName, cols, predicate, err := BreakExpressionInLHSandRHS(ctx, expr, TableID(a.LHS)) if err != nil { return err diff --git a/go/vt/vtgate/planbuilder/operators/compact.go b/go/vt/vtgate/planbuilder/operators/compact.go deleted file mode 100644 index 2d596cb51db..00000000000 --- a/go/vt/vtgate/planbuilder/operators/compact.go +++ /dev/null @@ -1,108 +0,0 @@ -/* -Copyright 2022 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package operators - -import ( - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" -) - -// compact will optimise the operator tree into a smaller but equivalent version -func compact(ctx *plancontext.PlanningContext, op Operator) (Operator, error) { - newOp, _, err := rewriteBottomUp(ctx, op, func(ctx *plancontext.PlanningContext, op Operator) (Operator, bool, error) { - newOp, ok := op.(compactable) - if !ok { - return op, false, nil - } - return newOp.compact(ctx) - }) - return newOp, err -} - -func (f *Filter) compact(*plancontext.PlanningContext) (Operator, bool, error) { - if len(f.Predicates) == 0 { - return f.Source, true, nil - } - - other, isFilter := f.Source.(*Filter) - if !isFilter { - return f, false, nil - } - f.Source = other.Source - f.Predicates = append(f.Predicates, other.Predicates...) - return f, true, nil -} - -func (u *Union) compact(*plancontext.PlanningContext) (Operator, bool, error) { - var newSources []Operator - anythingChanged := false - for _, source := range u.Sources { - var other *Union - horizon, ok := source.(*Horizon) - if ok { - union, ok := horizon.Source.(*Union) - if ok { - other = union - } - } - if other == nil { - newSources = append(newSources, source) - continue - } - anythingChanged = true - switch { - case len(other.Ordering) == 0 && !other.Distinct: - fallthrough - case u.Distinct: - // if the current UNION is a DISTINCT, we can safely ignore everything from children UNIONs, except LIMIT - newSources = append(newSources, other.Sources...) - - default: - newSources = append(newSources, other) - } - } - if anythingChanged { - u.Sources = newSources - } - return u, anythingChanged, nil -} - -func (j *Join) compact(ctx *plancontext.PlanningContext) (Operator, bool, error) { - if j.LeftJoin { - // we can't merge outer joins into a single QG - return j, false, nil - } - - lqg, lok := j.LHS.(*QueryGraph) - rqg, rok := j.RHS.(*QueryGraph) - if !lok || !rok { - return j, false, nil - } - - newOp := &QueryGraph{ - Tables: append(lqg.Tables, rqg.Tables...), - innerJoins: append(lqg.innerJoins, rqg.innerJoins...), - NoDeps: sqlparser.AndExpressions(lqg.NoDeps, rqg.NoDeps), - } - if j.Predicate != nil { - err := newOp.collectPredicate(ctx, j.Predicate) - if err != nil { - return nil, false, err - } - } - return newOp, true, nil -} diff --git a/go/vt/vtgate/planbuilder/operators/correlated_subquery.go b/go/vt/vtgate/planbuilder/operators/correlated_subquery.go index 8dfebd1990f..d95207f0a7a 100644 --- a/go/vt/vtgate/planbuilder/operators/correlated_subquery.go +++ b/go/vt/vtgate/planbuilder/operators/correlated_subquery.go @@ -18,11 +18,12 @@ package operators import ( "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" ) type ( CorrelatedSubQueryOp struct { - Outer, Inner Operator + Outer, Inner ops.Operator Extracted *sqlparser.ExtractedSubquery // JoinCols are the columns from the LHS used for the join. @@ -37,7 +38,7 @@ type ( } SubQueryOp struct { - Outer, Inner Operator + Outer, Inner ops.Operator Extracted *sqlparser.ExtractedSubquery noColumns @@ -45,15 +46,14 @@ type ( } ) -var _ PhysicalOperator = (*SubQueryOp)(nil) -var _ PhysicalOperator = (*CorrelatedSubQueryOp)(nil) +var _ ops.PhysicalOperator = (*SubQueryOp)(nil) +var _ ops.PhysicalOperator = (*CorrelatedSubQueryOp)(nil) // IPhysical implements the PhysicalOperator interface func (s *SubQueryOp) IPhysical() {} -// clone implements the Operator interface -func (s *SubQueryOp) clone(inputs []Operator) Operator { - checkSize(inputs, 2) +// Clone implements the Operator interface +func (s *SubQueryOp) Clone(inputs []ops.Operator) ops.Operator { result := &SubQueryOp{ Outer: inputs[0], Inner: inputs[1], @@ -62,17 +62,16 @@ func (s *SubQueryOp) clone(inputs []Operator) Operator { return result } -// inputs implements the Operator interface -func (s *SubQueryOp) inputs() []Operator { - return []Operator{s.Outer, s.Inner} +// Inputs implements the Operator interface +func (s *SubQueryOp) Inputs() []ops.Operator { + return []ops.Operator{s.Outer, s.Inner} } // IPhysical implements the PhysicalOperator interface func (c *CorrelatedSubQueryOp) IPhysical() {} -// clone implements the Operator interface -func (c *CorrelatedSubQueryOp) clone(inputs []Operator) Operator { - checkSize(inputs, 2) +// Clone implements the Operator interface +func (c *CorrelatedSubQueryOp) Clone(inputs []ops.Operator) ops.Operator { columns := make([]*sqlparser.ColName, len(c.LHSColumns)) copy(columns, c.LHSColumns) vars := make(map[string]int, len(c.Vars)) @@ -90,7 +89,7 @@ func (c *CorrelatedSubQueryOp) clone(inputs []Operator) Operator { return result } -// inputs implements the Operator interface -func (c *CorrelatedSubQueryOp) inputs() []Operator { - return []Operator{c.Outer, c.Inner} +// Inputs implements the Operator interface +func (c *CorrelatedSubQueryOp) Inputs() []ops.Operator { + return []ops.Operator{c.Outer, c.Inner} } diff --git a/go/vt/vtgate/planbuilder/operators/delete.go b/go/vt/vtgate/planbuilder/operators/delete.go index 085456b1089..ab5448b07fb 100644 --- a/go/vt/vtgate/planbuilder/operators/delete.go +++ b/go/vt/vtgate/planbuilder/operators/delete.go @@ -18,6 +18,7 @@ package operators import ( "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/semantics" "vitess.io/vitess/go/vt/vtgate/vindexes" ) @@ -33,7 +34,7 @@ type Delete struct { noPredicates } -var _ PhysicalOperator = (*Delete)(nil) +var _ ops.PhysicalOperator = (*Delete)(nil) // Introduces implements the PhysicalOperator interface func (d *Delete) Introduces() semantics.TableSet { @@ -43,9 +44,8 @@ func (d *Delete) Introduces() semantics.TableSet { // IPhysical implements the PhysicalOperator interface func (d *Delete) IPhysical() {} -// clone implements the Operator interface -func (d *Delete) clone(inputs []Operator) Operator { - checkSize(inputs, 0) +// Clone implements the Operator interface +func (d *Delete) Clone(inputs []ops.Operator) ops.Operator { return &Delete{ QTable: d.QTable, VTable: d.VTable, diff --git a/go/vt/vtgate/planbuilder/operators/derived.go b/go/vt/vtgate/planbuilder/operators/derived.go index aa4f2a785d3..21c405611fc 100644 --- a/go/vt/vtgate/planbuilder/operators/derived.go +++ b/go/vt/vtgate/planbuilder/operators/derived.go @@ -19,6 +19,8 @@ package operators import ( "golang.org/x/exp/slices" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" @@ -27,7 +29,7 @@ import ( ) type Derived struct { - Source Operator + Source ops.Operator Query sqlparser.SelectStatement Alias string @@ -38,14 +40,13 @@ type Derived struct { ColumnsOffset []int } -var _ PhysicalOperator = (*Derived)(nil) +var _ ops.PhysicalOperator = (*Derived)(nil) // IPhysical implements the PhysicalOperator interface func (d *Derived) IPhysical() {} // Clone implements the Operator interface -func (d *Derived) clone(inputs []Operator) Operator { - checkSize(inputs, 1) +func (d *Derived) Clone(inputs []ops.Operator) ops.Operator { return &Derived{ Source: inputs[0], Query: d.Query, @@ -101,12 +102,12 @@ func (d *Derived) IsMergeable(ctx *plancontext.PlanningContext) bool { return isMergeable(ctx, d.Query, d) } -// inputs implements the Operator interface -func (d *Derived) inputs() []Operator { - return []Operator{d.Source} +// Inputs implements the Operator interface +func (d *Derived) Inputs() []ops.Operator { + return []ops.Operator{d.Source} } -func (d *Derived) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { +func (d *Derived) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { if _, isUNion := d.Source.(*Union); isUNion { // If we have a derived table on top of a UNION, we can let the UNION do the expression rewriting var err error diff --git a/go/vt/vtgate/planbuilder/operators/expressions.go b/go/vt/vtgate/planbuilder/operators/expressions.go new file mode 100644 index 00000000000..b128907f93b --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/expressions.go @@ -0,0 +1,62 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/semantics" +) + +// BreakExpressionInLHSandRHS takes an expression and +// extracts the parts that are coming from one of the sides into `ColName`s that are needed +func BreakExpressionInLHSandRHS( + ctx *plancontext.PlanningContext, + expr sqlparser.Expr, + lhs semantics.TableSet, +) (bvNames []string, columns []*sqlparser.ColName, rewrittenExpr sqlparser.Expr, err error) { + rewrittenExpr = sqlparser.CloneExpr(expr) + _ = sqlparser.Rewrite(rewrittenExpr, nil, func(cursor *sqlparser.Cursor) bool { + switch node := cursor.Node().(type) { + case *sqlparser.ColName: + deps := ctx.SemTable.RecursiveDeps(node) + if deps.NumberOfTables() == 0 { + err = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unknown column. has the AST been copied?") + return false + } + if deps.IsSolvedBy(lhs) { + node.Qualifier.Qualifier = sqlparser.NewIdentifierCS("") + columns = append(columns, node) + bvName := node.CompliantName() + bvNames = append(bvNames, bvName) + arg := sqlparser.NewArgument(bvName) + // we are replacing one of the sides of the comparison with an argument, + // but we don't want to lose the type information we have, so we copy it over + ctx.SemTable.CopyExprInfo(node, arg) + cursor.Replace(arg) + } + } + return true + }) + if err != nil { + return nil, nil, nil, err + } + ctx.JoinPredicates[expr] = append(ctx.JoinPredicates[expr], rewrittenExpr) + return +} diff --git a/go/vt/vtgate/planbuilder/operators/filter.go b/go/vt/vtgate/planbuilder/operators/filter.go index defe6236608..d28511dbe86 100644 --- a/go/vt/vtgate/planbuilder/operators/filter.go +++ b/go/vt/vtgate/planbuilder/operators/filter.go @@ -18,29 +18,30 @@ package operators import ( "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/semantics" ) type Filter struct { - Source Operator + Source ops.Operator Predicates []sqlparser.Expr } -var _ PhysicalOperator = (*Filter)(nil) +var _ ops.PhysicalOperator = (*Filter)(nil) -func newFilter(op Operator, expr ...sqlparser.Expr) Operator { +func newFilter(op ops.Operator, expr sqlparser.Expr) ops.Operator { return &Filter{ - Source: op, Predicates: expr, + Source: op, Predicates: []sqlparser.Expr{expr}, } } // IPhysical implements the PhysicalOperator interface func (f *Filter) IPhysical() {} -// clone implements the Operator interface -func (f *Filter) clone(inputs []Operator) Operator { - checkSize(inputs, 1) +// Clone implements the Operator interface +func (f *Filter) Clone(inputs []ops.Operator) ops.Operator { predicatesClone := make([]sqlparser.Expr, len(f.Predicates)) copy(predicatesClone, f.Predicates) return &Filter{ @@ -49,9 +50,9 @@ func (f *Filter) clone(inputs []Operator) Operator { } } -// inputs implements the Operator interface -func (f *Filter) inputs() []Operator { - return []Operator{f.Source} +// Inputs implements the Operator interface +func (f *Filter) Inputs() []ops.Operator { + return []ops.Operator{f.Source} } // UnsolvedPredicates implements the unresolved interface @@ -67,7 +68,7 @@ func (f *Filter) UnsolvedPredicates(st *semantics.SemTable) []sqlparser.Expr { return result } -func (f *Filter) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { +func (f *Filter) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { newSrc, err := f.Source.AddPredicate(ctx, expr) if err != nil { return nil, err @@ -79,3 +80,17 @@ func (f *Filter) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.E func (f *Filter) AddColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (int, error) { return f.Source.AddColumn(ctx, expr) } + +func (f *Filter) Compact(*plancontext.PlanningContext) (ops.Operator, rewrite.TreeIdentity, error) { + if len(f.Predicates) == 0 { + return f.Source, rewrite.NewTree, nil + } + + other, isFilter := f.Source.(*Filter) + if !isFilter { + return f, rewrite.SameTree, nil + } + f.Source = other.Source + f.Predicates = append(f.Predicates, other.Predicates...) + return f, rewrite.NewTree, nil +} diff --git a/go/vt/vtgate/planbuilder/operators/helpers.go b/go/vt/vtgate/planbuilder/operators/helpers.go new file mode 100644 index 00000000000..4571e077089 --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/helpers.go @@ -0,0 +1,119 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/semantics" +) + +// Compact will optimise the operator tree into a smaller but equivalent version +func Compact(ctx *plancontext.PlanningContext, op ops.Operator) (ops.Operator, error) { + type compactable interface { + // Compact implement this interface for operators that have easy to see optimisations + Compact(ctx *plancontext.PlanningContext) (ops.Operator, rewrite.TreeIdentity, error) + } + + newOp, err := rewrite.BottomUp(op, func(op ops.Operator) (ops.Operator, rewrite.TreeIdentity, error) { + newOp, ok := op.(compactable) + if !ok { + return op, rewrite.SameTree, nil + } + return newOp.Compact(ctx) + }) + return newOp, err +} + +func CheckValid(op ops.Operator) error { + type checkable interface { + CheckValid() error + } + + return rewrite.Visit(op, func(this ops.Operator) error { + if chk, ok := this.(checkable); ok { + return chk.CheckValid() + } + return nil + }) +} + +func Clone(op ops.Operator) ops.Operator { + inputs := op.Inputs() + clones := make([]ops.Operator, len(inputs)) + for i, input := range inputs { + clones[i] = Clone(input) + } + return op.Clone(clones) +} + +// TableIDIntroducer is used to signal that this operator introduces data from a new source +type TableIDIntroducer interface { + Introduces() semantics.TableSet +} + +func TableID(op ops.Operator) (result semantics.TableSet) { + _ = rewrite.Visit(op, func(this ops.Operator) error { + if tbl, ok := this.(TableIDIntroducer); ok { + result.MergeInPlace(tbl.Introduces()) + } + return nil + }) + return +} + +func UnresolvedPredicates(op ops.Operator, st *semantics.SemTable) (result []sqlparser.Expr) { + type unresolved interface { + // UnsolvedPredicates returns any predicates that have dependencies on the given Operator and + // on the outside of it (a parent Select expression, any other table not used by Operator, etc). + // This is used for sub-queries. An example query could be: + // SELECT * FROM tbl WHERE EXISTS (SELECT 1 FROM otherTbl WHERE tbl.col = otherTbl.col) + // The subquery would have one unsolved predicate: `tbl.col = otherTbl.col` + // It's a predicate that belongs to the inner query, but it needs data from the outer query + // These predicates dictate which data we have to send from the outer side to the inner + UnsolvedPredicates(semTable *semantics.SemTable) []sqlparser.Expr + } + + _ = rewrite.Visit(op, func(this ops.Operator) error { + if tbl, ok := this.(unresolved); ok { + result = append(result, tbl.UnsolvedPredicates(st)...) + } + + return nil + }) + return +} + +func CostOf(op ops.Operator) (cost int) { + type costly interface { + // Cost returns the cost for this operator. All the costly operators in the tree are summed together to get the + // total cost of the operator tree. + // TODO: We should really calculate this using cardinality estimation, + // but until then this is better than nothing + Cost() int + } + + _ = rewrite.Visit(op, func(op ops.Operator) error { + if costlyOp, ok := op.(costly); ok { + cost += costlyOp.Cost() + } + return nil + }) + return +} diff --git a/go/vt/vtgate/planbuilder/operators/horizon.go b/go/vt/vtgate/planbuilder/operators/horizon.go index 7df3ac1a6a8..7bbe3eb9e98 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon.go +++ b/go/vt/vtgate/planbuilder/operators/horizon.go @@ -18,24 +18,25 @@ package operators import ( "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" ) // Horizon is an operator we use until we decide how to handle the source to the horizon. // It contains information about the planning we have to do after deciding how we will send the query to the tablets. type Horizon struct { - Source Operator + Source ops.Operator Select sqlparser.SelectStatement noColumns } -var _ Operator = (*Horizon)(nil) -var _ PhysicalOperator = (*Horizon)(nil) +var _ ops.Operator = (*Horizon)(nil) +var _ ops.PhysicalOperator = (*Horizon)(nil) func (h *Horizon) IPhysical() {} -func (h *Horizon) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { +func (h *Horizon) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { newSrc, err := h.Source.AddPredicate(ctx, expr) if err != nil { return nil, err @@ -44,14 +45,13 @@ func (h *Horizon) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser. return h, nil } -func (h *Horizon) clone(inputs []Operator) Operator { - checkSize(inputs, 1) +func (h *Horizon) Clone(inputs []ops.Operator) ops.Operator { return &Horizon{ Source: inputs[0], Select: h.Select, } } -func (h *Horizon) inputs() []Operator { - return []Operator{h.Source} +func (h *Horizon) Inputs() []ops.Operator { + return []ops.Operator{h.Source} } diff --git a/go/vt/vtgate/planbuilder/operators/horizon_planning.go b/go/vt/vtgate/planbuilder/operators/horizon_planning.go index adba685fa9e..481d2c6be58 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_planning.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_planning.go @@ -19,37 +19,42 @@ package operators import ( "errors" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" + + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" ) var errNotHorizonPlanned = errors.New("query can't be fully operator planned") -func planHorizons(ctx *plancontext.PlanningContext, in Operator) (Operator, error) { - return rewriteBreakableTopDown(ctx, in, func(ctx *plancontext.PlanningContext, in Operator) (Operator, bool, error) { +func planHorizons(in ops.Operator) (ops.Operator, error) { + return rewrite.TopDown(in, func(in ops.Operator) (ops.Operator, rewrite.TreeIdentity, rewrite.VisitRule, error) { switch in := in.(type) { case *Horizon: - op, err := planHorizon(ctx, in) + op, err := planHorizon(in) if err != nil { - return nil, false, err + return nil, rewrite.SameTree, rewrite.SkipChildren, err } - return op, true, nil + return op, rewrite.NewTree, rewrite.VisitChildren, nil case *Route: - return in, false, nil + return in, rewrite.SameTree, rewrite.SkipChildren, nil default: - return in, true, nil + return in, rewrite.SameTree, rewrite.VisitChildren, nil } }) } -func planHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operator, error) { +func planHorizon(in *Horizon) (ops.Operator, error) { rb, isRoute := in.Source.(*Route) - if isRoute && rb.IsSingleShard() { - return planSingleShardRoute(in.Select, rb, in) + if !isRoute { + return in, nil + } + if isRoute && rb.IsSingleShard() && in.Select.GetLimit() == nil { + return planSingleShardRoute(rb, in) } return nil, errNotHorizonPlanned } -func planSingleShardRoute(statement sqlparser.SelectStatement, rb *Route, horizon *Horizon) (Operator, error) { - return nil, errNotHorizonPlanned +func planSingleShardRoute(rb *Route, horizon *Horizon) (ops.Operator, error) { + rb.Source, horizon.Source = horizon, rb.Source + return rb, nil } diff --git a/go/vt/vtgate/planbuilder/operators/join.go b/go/vt/vtgate/planbuilder/operators/join.go index e942320e1c2..3c8a801ca0e 100644 --- a/go/vt/vtgate/planbuilder/operators/join.go +++ b/go/vt/vtgate/planbuilder/operators/join.go @@ -18,24 +18,24 @@ package operators import ( "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" - "vitess.io/vitess/go/vt/vtgate/semantics" ) // Join represents a join. If we have a predicate, this is an inner join. If no predicate exists, it is a cross join type Join struct { - LHS, RHS Operator + LHS, RHS ops.Operator Predicate sqlparser.Expr LeftJoin bool noColumns } -var _ Operator = (*Join)(nil) +var _ ops.Operator = (*Join)(nil) -// clone implements the Operator interface -func (j *Join) clone(inputs []Operator) Operator { - checkSize(inputs, 2) +// Clone implements the Operator interface +func (j *Join) Clone(inputs []ops.Operator) ops.Operator { clone := *j clone.LHS = inputs[0] clone.RHS = inputs[1] @@ -47,19 +47,45 @@ func (j *Join) clone(inputs []Operator) Operator { } } -// inputs implements the Operator interface -func (j *Join) inputs() []Operator { - return []Operator{j.LHS, j.RHS} +// Inputs implements the Operator interface +func (j *Join) Inputs() []ops.Operator { + return []ops.Operator{j.LHS, j.RHS} } -func createOuterJoin(tableExpr *sqlparser.JoinTableExpr, lhs, rhs Operator) (Operator, error) { +func (j *Join) Compact(ctx *plancontext.PlanningContext) (ops.Operator, rewrite.TreeIdentity, error) { + if j.LeftJoin { + // we can't merge outer joins into a single QG + return j, rewrite.SameTree, nil + } + + lqg, lok := j.LHS.(*QueryGraph) + rqg, rok := j.RHS.(*QueryGraph) + if !lok || !rok { + return j, rewrite.SameTree, nil + } + + newOp := &QueryGraph{ + Tables: append(lqg.Tables, rqg.Tables...), + innerJoins: append(lqg.innerJoins, rqg.innerJoins...), + NoDeps: sqlparser.AndExpressions(lqg.NoDeps, rqg.NoDeps), + } + if j.Predicate != nil { + err := newOp.collectPredicate(ctx, j.Predicate) + if err != nil { + return nil, rewrite.SameTree, err + } + } + return newOp, rewrite.NewTree, nil +} + +func createOuterJoin(tableExpr *sqlparser.JoinTableExpr, lhs, rhs ops.Operator) (ops.Operator, error) { if tableExpr.Join == sqlparser.RightJoinType { lhs, rhs = rhs, lhs } return &Join{LHS: lhs, RHS: rhs, LeftJoin: true, Predicate: sqlparser.RemoveKeyspaceFromColName(tableExpr.Condition.On)}, nil } -func createJoin(LHS, RHS Operator) Operator { +func createJoin(LHS, RHS ops.Operator) ops.Operator { lqg, lok := LHS.(*QueryGraph) rqg, rok := RHS.(*QueryGraph) if lok && rok { @@ -73,7 +99,7 @@ func createJoin(LHS, RHS Operator) Operator { return &Join{LHS: LHS, RHS: RHS} } -func createInnerJoin(ctx *plancontext.PlanningContext, tableExpr *sqlparser.JoinTableExpr, lhs, rhs Operator) (Operator, error) { +func createInnerJoin(ctx *plancontext.PlanningContext, tableExpr *sqlparser.JoinTableExpr, lhs, rhs ops.Operator) (ops.Operator, error) { op := createJoin(lhs, rhs) if tableExpr.Condition.On != nil { var err error @@ -86,41 +112,37 @@ func createInnerJoin(ctx *plancontext.PlanningContext, tableExpr *sqlparser.Join return op, nil } -func (j *Join) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { - return addPredicate(j, ctx, expr, false) +func (j *Join) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { + return AddPredicate(j, ctx, expr, false, newFilter) } -var _ joinOperator = (*Join)(nil) - -func (j *Join) tableID() semantics.TableSet { - return TableID(j) -} +var _ JoinOp = (*Join)(nil) -func (j *Join) getLHS() Operator { +func (j *Join) GetLHS() ops.Operator { return j.LHS } -func (j *Join) getRHS() Operator { +func (j *Join) GetRHS() ops.Operator { return j.RHS } -func (j *Join) setLHS(operator Operator) { +func (j *Join) SetLHS(operator ops.Operator) { j.LHS = operator } -func (j *Join) setRHS(operator Operator) { +func (j *Join) SetRHS(operator ops.Operator) { j.RHS = operator } -func (j *Join) makeInner() { +func (j *Join) MakeInner() { j.LeftJoin = false } -func (j *Join) isInner() bool { +func (j *Join) IsInner() bool { return !j.LeftJoin } -func (j *Join) addJoinPredicate(_ *plancontext.PlanningContext, expr sqlparser.Expr) error { +func (j *Join) AddJoinPredicate(_ *plancontext.PlanningContext, expr sqlparser.Expr) error { j.Predicate = sqlparser.AndExpressions(j.Predicate, expr) return nil } diff --git a/go/vt/vtgate/planbuilder/operators/joins.go b/go/vt/vtgate/planbuilder/operators/joins.go new file mode 100644 index 00000000000..a91f6b43ffc --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/joins.go @@ -0,0 +1,125 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package operators + +import ( + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" + "vitess.io/vitess/go/vt/vtgate/semantics" +) + +type JoinOp interface { + ops.Operator + GetLHS() ops.Operator + GetRHS() ops.Operator + SetLHS(ops.Operator) + SetRHS(ops.Operator) + MakeInner() + IsInner() bool + AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) error +} + +func AddPredicate(join JoinOp, ctx *plancontext.PlanningContext, expr sqlparser.Expr, joinPredicates bool, newFilter func(ops.Operator, sqlparser.Expr) ops.Operator) (ops.Operator, error) { + deps := ctx.SemTable.RecursiveDeps(expr) + switch { + case deps.IsSolvedBy(TableID(join.GetLHS())): + // predicates can always safely be pushed down to the lhs if that is all they depend on + lhs, err := join.GetLHS().AddPredicate(ctx, expr) + if err != nil { + return nil, err + } + join.SetLHS(lhs) + return join, err + case deps.IsSolvedBy(TableID(join.GetRHS())): + // if we are dealing with an outer join, always start by checking if this predicate can turn + // the join into an inner join + if !join.IsInner() && canConvertToInner(ctx, expr, TableID(join.GetRHS())) { + join.MakeInner() + } + + if !joinPredicates && !join.IsInner() { + // if we still are dealing with an outer join + // we need to filter after the join has been evaluated + return newFilter(join, expr), nil + } + + // For inner joins, we can just push the filtering on the RHS + rhs, err := join.GetRHS().AddPredicate(ctx, expr) + if err != nil { + return nil, err + } + join.SetRHS(rhs) + return join, err + + case deps.IsSolvedBy(TableID(join)): + // if we are dealing with an outer join, always start by checking if this predicate can turn + // the join into an inner join + if !joinPredicates && !join.IsInner() && canConvertToInner(ctx, expr, TableID(join.GetRHS())) { + join.MakeInner() + } + + if !joinPredicates && !join.IsInner() { + // if we still are dealing with an outer join + // we need to filter after the join has been evaluated + return newFilter(join, expr), nil + } + + err := join.AddJoinPredicate(ctx, expr) + if err != nil { + return nil, err + } + + return join, nil + } + return nil, nil +} + +// we are looking for predicates like `tbl.col = <>` or `<> = tbl.col`, +// where tbl is on the rhs of the left outer join +// When a predicate uses information from an outer table, we can convert from an outer join to an inner join +// if the predicate is "null-intolerant". +// +// Null-intolerant in this context means that the predicate will not be true if the table columns are null. +// +// Since an outer join is an inner join with the addition of all the rows from the left-hand side that +// matched no rows on the right-hand, if we are later going to remove all the rows where the right-hand +// side did not match, we might as well turn the join into an inner join. +// +// This is based on the paper "Canonical Abstraction for Outerjoin Optimization" by J Rao et al +func canConvertToInner(ctx *plancontext.PlanningContext, expr sqlparser.Expr, rhs semantics.TableSet) bool { + isColNameFromRHS := func(e sqlparser.Expr) bool { + return sqlparser.IsColName(e) && ctx.SemTable.RecursiveDeps(e).IsSolvedBy(rhs) + } + switch expr := expr.(type) { + case *sqlparser.ComparisonExpr: + if expr.Operator == sqlparser.NullSafeEqualOp { + return false + } + + return isColNameFromRHS(expr.Left) || isColNameFromRHS(expr.Right) + + case *sqlparser.IsExpr: + if expr.Right != sqlparser.IsNotNullOp { + return false + } + + return isColNameFromRHS(expr.Left) + default: + return false + } +} diff --git a/go/vt/vtgate/planbuilder/operators/logical.go b/go/vt/vtgate/planbuilder/operators/logical.go index 1154d05be7e..99dc10e8ad7 100644 --- a/go/vt/vtgate/planbuilder/operators/logical.go +++ b/go/vt/vtgate/planbuilder/operators/logical.go @@ -21,12 +21,13 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/semantics" ) // createLogicalOperatorFromAST creates an operator tree that represents the input SELECT or UNION query -func createLogicalOperatorFromAST(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) (op Operator, err error) { +func createLogicalOperatorFromAST(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) (op ops.Operator, err error) { switch node := selStmt.(type) { case *sqlparser.Select: op, err = createOperatorFromSelect(ctx, node) @@ -47,7 +48,7 @@ func createLogicalOperatorFromAST(ctx *plancontext.PlanningContext, selStmt sqlp } // createOperatorFromSelect creates an operator tree that represents the input SELECT query -func createOperatorFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.Select) (Operator, error) { +func createOperatorFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.Select) (ops.Operator, error) { subq, err := createSubqueryFromStatement(ctx, sel) if err != nil { return nil, err @@ -79,7 +80,7 @@ func createOperatorFromSelect(ctx *plancontext.PlanningContext, sel *sqlparser.S }, nil } -func createOperatorFromUnion(ctx *plancontext.PlanningContext, node *sqlparser.Union) (Operator, error) { +func createOperatorFromUnion(ctx *plancontext.PlanningContext, node *sqlparser.Union) (ops.Operator, error) { opLHS, err := createLogicalOperatorFromAST(ctx, node.Left) if err != nil { return nil, err @@ -96,13 +97,13 @@ func createOperatorFromUnion(ctx *plancontext.PlanningContext, node *sqlparser.U union := &Union{ Distinct: node.Distinct, - Sources: []Operator{opLHS, opRHS}, + Sources: []ops.Operator{opLHS, opRHS}, Ordering: node.OrderBy, } return &Horizon{Source: union, Select: node}, nil } -func createOperatorFromUpdate(ctx *plancontext.PlanningContext, updStmt *sqlparser.Update) (Operator, error) { +func createOperatorFromUpdate(ctx *plancontext.PlanningContext, updStmt *sqlparser.Update) (ops.Operator, error) { tableInfo, qt, err := createQueryTableForDML(ctx, updStmt.TableExprs[0], updStmt.Where) if err != nil { return nil, err @@ -161,7 +162,7 @@ func createOperatorFromUpdate(ctx *plancontext.PlanningContext, updStmt *sqlpars return subq, nil } -func createOperatorFromDelete(ctx *plancontext.PlanningContext, deleteStmt *sqlparser.Delete) (Operator, error) { +func createOperatorFromDelete(ctx *plancontext.PlanningContext, deleteStmt *sqlparser.Delete) (ops.Operator, error) { tableInfo, qt, err := createQueryTableForDML(ctx, deleteStmt.TableExprs[0], deleteStmt.Where) if err != nil { return nil, err @@ -226,7 +227,7 @@ func createOperatorFromDelete(ctx *plancontext.PlanningContext, deleteStmt *sqlp return subq, nil } -func getOperatorFromTableExpr(ctx *plancontext.PlanningContext, tableExpr sqlparser.TableExpr) (Operator, error) { +func getOperatorFromTableExpr(ctx *plancontext.PlanningContext, tableExpr sqlparser.TableExpr) (ops.Operator, error) { switch tableExpr := tableExpr.(type) { case *sqlparser.AliasedTableExpr: return getOperatorFromAliasedTableExpr(ctx, tableExpr) @@ -239,7 +240,7 @@ func getOperatorFromTableExpr(ctx *plancontext.PlanningContext, tableExpr sqlpar } } -func getOperatorFromJoinTableExpr(ctx *plancontext.PlanningContext, tableExpr *sqlparser.JoinTableExpr) (Operator, error) { +func getOperatorFromJoinTableExpr(ctx *plancontext.PlanningContext, tableExpr *sqlparser.JoinTableExpr) (ops.Operator, error) { lhs, err := getOperatorFromTableExpr(ctx, tableExpr.LeftExpr) if err != nil { return nil, err @@ -259,7 +260,7 @@ func getOperatorFromJoinTableExpr(ctx *plancontext.PlanningContext, tableExpr *s } } -func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr *sqlparser.AliasedTableExpr) (Operator, error) { +func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr *sqlparser.AliasedTableExpr) (ops.Operator, error) { switch tbl := tableExpr.Expr.(type) { case sqlparser.TableName: tableID := ctx.SemTable.TableSetFor(tableExpr) @@ -301,8 +302,8 @@ func getOperatorFromAliasedTableExpr(ctx *plancontext.PlanningContext, tableExpr } } -func crossJoin(ctx *plancontext.PlanningContext, exprs sqlparser.TableExprs) (Operator, error) { - var output Operator +func crossJoin(ctx *plancontext.PlanningContext, exprs sqlparser.TableExprs) (ops.Operator, error) { + var output ops.Operator for _, tableExpr := range exprs { op, err := getOperatorFromTableExpr(ctx, tableExpr) if err != nil { diff --git a/go/vt/vtgate/planbuilder/operators/operator.go b/go/vt/vtgate/planbuilder/operators/operator.go index ed72f1fd5c4..3fe18d8dbe2 100644 --- a/go/vt/vtgate/planbuilder/operators/operator.go +++ b/go/vt/vtgate/planbuilder/operators/operator.go @@ -34,77 +34,14 @@ The operators go through a few phases while planning: package operators import ( - "fmt" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" - "vitess.io/vitess/go/vt/vtgate/semantics" ) type ( - // Operator forms the tree of operators, representing the declarative query provided. - // While planning, the operator tree starts with logical operators, and later moves to physical operators. - // The difference between the two is that when we get to a physical operator, we have made decisions on in - // which order to do the joins, and how to split them up across shards and keyspaces. - // In some situation we go straight to the physical operator - when there are no options to consider, - // we can go straight to the end result. - Operator interface { - clone(inputs []Operator) Operator - inputs() []Operator - - // AddPredicate is used to push predicates. It pushed it as far down as is possible in the tree. - // If we encounter a join and the predicate depends on both sides of the join, the predicate will be split into two parts, - // where data is fetched from the LHS of the join to be used in the evaluation on the RHS - AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) - - // AddColumn tells an operator to also output an additional column specified. - // The offset to the column is returned. - AddColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (int, error) - } - - // PhysicalOperator means that this operator is ready to be turned into a logical plan - PhysicalOperator interface { - Operator - IPhysical() - } - - // tableIDIntroducer is used to signal that this operator introduces data from a new source - tableIDIntroducer interface { - Introduces() semantics.TableSet - } - - unresolved interface { - // UnsolvedPredicates returns any predicates that have dependencies on the given Operator and - // on the outside of it (a parent Select expression, any other table not used by Operator, etc). - // This is used for sub-queries. An example query could be: - // SELECT * FROM tbl WHERE EXISTS (SELECT 1 FROM otherTbl WHERE tbl.col = otherTbl.col) - // The subquery would have one unsolved predicate: `tbl.col = otherTbl.col` - // It's a predicate that belongs to the inner query, but it needs data from the outer query - // These predicates dictate which data we have to send from the outer side to the inner - UnsolvedPredicates(semTable *semantics.SemTable) []sqlparser.Expr - } - - costly interface { - // Cost returns the cost for this operator. All the costly operators in the tree are summed together to get the - // total cost of the operator tree. - // TODO: We should really calculate this using cardinality estimation, - // but until then this is better than nothing - Cost() int - } - - checkable interface { - // checkValid allows operators that need a final check before being used, to make sure that - // all the necessary information is in the operator - checkValid() error - } - - compactable interface { - // implement this interface for operators that have easy to see optimisations - compact(ctx *plancontext.PlanningContext) (Operator, bool, error) - } - // helper type that implements Inputs() returning nil noInputs struct{} @@ -115,13 +52,13 @@ type ( noPredicates struct{} ) -func PlanQuery(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) (Operator, error) { +func PlanQuery(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) (ops.Operator, error) { op, err := createLogicalOperatorFromAST(ctx, selStmt) if err != nil { return nil, err } - if err = checkValid(op); err != nil { + if err = CheckValid(op); err != nil { return nil, err } @@ -130,24 +67,24 @@ func PlanQuery(ctx *plancontext.PlanningContext, selStmt sqlparser.Statement) (O return nil, err } - backup := clone(op) + backup := Clone(op) - op, err = planHorizons(ctx, op) + op, err = planHorizons(op) if err == errNotHorizonPlanned { op = backup } else if err != nil { return nil, err } - if op, err = compact(ctx, op); err != nil { + if op, err = Compact(ctx, op); err != nil { return nil, err } return op, err } -// inputs implements the Operator interface -func (noInputs) inputs() []Operator { +// Inputs implements the Operator interface +func (noInputs) Inputs() []ops.Operator { return nil } @@ -157,124 +94,6 @@ func (noColumns) AddColumn(*plancontext.PlanningContext, sqlparser.Expr) (int, e } // AddPredicate implements the Operator interface -func (noPredicates) AddPredicate(*plancontext.PlanningContext, sqlparser.Expr) (Operator, error) { +func (noPredicates) AddPredicate(*plancontext.PlanningContext, sqlparser.Expr) (ops.Operator, error) { return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "this operator cannot accept predicates") } - -func VisitTopDown(root Operator, visitor func(Operator) error) error { - queue := []Operator{root} - for len(queue) > 0 { - this := queue[0] - queue = append(queue[1:], this.inputs()...) - err := visitor(this) - if err != nil { - return err - } - } - return nil -} - -func TableID(op Operator) (result semantics.TableSet) { - _ = VisitTopDown(op, func(this Operator) error { - if tbl, ok := this.(tableIDIntroducer); ok { - result.MergeInPlace(tbl.Introduces()) - } - return nil - }) - return -} - -func unresolvedPredicates(op Operator, st *semantics.SemTable) (result []sqlparser.Expr) { - _ = VisitTopDown(op, func(this Operator) error { - if tbl, ok := this.(unresolved); ok { - result = append(result, tbl.UnsolvedPredicates(st)...) - } - - return nil - }) - return -} - -func checkValid(op Operator) error { - return VisitTopDown(op, func(this Operator) error { - if chk, ok := this.(checkable); ok { - return chk.checkValid() - } - return nil - }) -} - -func CostOf(op Operator) (cost int) { - _ = VisitTopDown(op, func(op Operator) error { - if costlyOp, ok := op.(costly); ok { - cost += costlyOp.Cost() - } - return nil - }) - return -} - -func clone(op Operator) Operator { - inputs := op.inputs() - clones := make([]Operator, len(inputs)) - for i, input := range inputs { - clones[i] = clone(input) - } - return op.clone(clones) -} - -func checkSize(inputs []Operator, shouldBe int) { - if len(inputs) != shouldBe { - panic(fmt.Sprintf("BUG: got the wrong number of inputs: got %d, expected %d", len(inputs), shouldBe)) - } -} - -type rewriterFunc func(*plancontext.PlanningContext, Operator) (newOp Operator, changed bool, err error) -type rewriterBreakableFunc func(*plancontext.PlanningContext, Operator) (newOp Operator, visitChildren bool, err error) - -func rewriteBottomUp(ctx *plancontext.PlanningContext, root Operator, rewriter rewriterFunc) (Operator, bool, error) { - oldInputs := root.inputs() - anythingChanged := false - newInputs := make([]Operator, len(oldInputs)) - for i, operator := range oldInputs { - in, changed, err := rewriteBottomUp(ctx, operator, rewriter) - if err != nil { - return nil, false, err - } - if changed { - anythingChanged = true - } - newInputs[i] = in - } - - if anythingChanged { - root = root.clone(newInputs) - } - - newOp, b, err := rewriter(ctx, root) - if err != nil { - return nil, false, err - } - return newOp, anythingChanged || b, nil -} - -func rewriteBreakableTopDown(ctx *plancontext.PlanningContext, in Operator, rewriterF rewriterBreakableFunc) ( - newOp Operator, - err error, -) { - newOp, visitChildren, err := rewriterF(ctx, in) - if err != nil || !visitChildren { - return - } - - oldInputs := newOp.inputs() - newInputs := make([]Operator, len(oldInputs)) - for i, oldInput := range oldInputs { - newInputs[i], err = rewriteBreakableTopDown(ctx, oldInput, rewriterF) - if err != nil { - return - } - } - newOp = newOp.clone(newInputs) - return -} diff --git a/go/vt/vtgate/planbuilder/operators/operator_funcs.go b/go/vt/vtgate/planbuilder/operators/operator_funcs.go index 88d10dddaaa..019c73bf439 100644 --- a/go/vt/vtgate/planbuilder/operators/operator_funcs.go +++ b/go/vt/vtgate/planbuilder/operators/operator_funcs.go @@ -20,14 +20,13 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" - - "vitess.io/vitess/go/vt/vtgate/semantics" ) // RemovePredicate is used when we turn a predicate into a plan operator, // and the predicate needs to be removed as an AST construct -func RemovePredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr, op Operator) (Operator, error) { +func RemovePredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr, op ops.Operator) (ops.Operator, error) { switch op := op.(type) { case *Route: newSrc, err := RemovePredicate(ctx, expr, op.Source) @@ -99,141 +98,3 @@ func RemovePredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr, op O return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "this should not happen - tried to remove predicate from table op") } } - -// BreakExpressionInLHSandRHS takes an expression and -// extracts the parts that are coming from one of the sides into `ColName`s that are needed -func BreakExpressionInLHSandRHS( - ctx *plancontext.PlanningContext, - expr sqlparser.Expr, - lhs semantics.TableSet, -) (bvNames []string, columns []*sqlparser.ColName, rewrittenExpr sqlparser.Expr, err error) { - rewrittenExpr = sqlparser.CloneExpr(expr) - _ = sqlparser.Rewrite(rewrittenExpr, nil, func(cursor *sqlparser.Cursor) bool { - switch node := cursor.Node().(type) { - case *sqlparser.ColName: - deps := ctx.SemTable.RecursiveDeps(node) - if deps.NumberOfTables() == 0 { - err = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unknown column. has the AST been copied?") - return false - } - if deps.IsSolvedBy(lhs) { - node.Qualifier.Qualifier = sqlparser.NewIdentifierCS("") - columns = append(columns, node) - bvName := node.CompliantName() - bvNames = append(bvNames, bvName) - arg := sqlparser.NewArgument(bvName) - // we are replacing one of the sides of the comparison with an argument, - // but we don't want to lose the type information we have, so we copy it over - ctx.SemTable.CopyExprInfo(node, arg) - cursor.Replace(arg) - } - } - return true - }) - if err != nil { - return nil, nil, nil, err - } - ctx.JoinPredicates[expr] = append(ctx.JoinPredicates[expr], rewrittenExpr) - return -} - -type joinOperator interface { - Operator - getLHS() Operator - getRHS() Operator - setLHS(Operator) - setRHS(Operator) - makeInner() - isInner() bool - addJoinPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) error -} - -func addPredicate(join joinOperator, ctx *plancontext.PlanningContext, expr sqlparser.Expr, joinPredicates bool) (Operator, error) { - deps := ctx.SemTable.RecursiveDeps(expr) - switch { - case deps.IsSolvedBy(TableID(join.getLHS())): - // predicates can always safely be pushed down to the lhs if that is all they depend on - lhs, err := join.getLHS().AddPredicate(ctx, expr) - if err != nil { - return nil, err - } - join.setLHS(lhs) - return join, err - case deps.IsSolvedBy(TableID(join.getRHS())): - // if we are dealing with an outer join, always start by checking if this predicate can turn - // the join into an inner join - if !join.isInner() && canConvertToInner(ctx, expr, TableID(join.getRHS())) { - join.makeInner() - } - - if !joinPredicates && !join.isInner() { - // if we still are dealing with an outer join - // we need to filter after the join has been evaluated - return newFilter(join, expr), nil - } - - // For inner joins, we can just push the filtering on the RHS - rhs, err := join.getRHS().AddPredicate(ctx, expr) - if err != nil { - return nil, err - } - join.setRHS(rhs) - return join, err - - case deps.IsSolvedBy(TableID(join)): - // if we are dealing with an outer join, always start by checking if this predicate can turn - // the join into an inner join - if !joinPredicates && !join.isInner() && canConvertToInner(ctx, expr, TableID(join.getRHS())) { - join.makeInner() - } - - if !joinPredicates && !join.isInner() { - // if we still are dealing with an outer join - // we need to filter after the join has been evaluated - return newFilter(join, expr), nil - } - - err := join.addJoinPredicate(ctx, expr) - if err != nil { - return nil, err - } - - return join, nil - } - return nil, nil -} - -// we are looking for predicates like `tbl.col = <>` or `<> = tbl.col`, -// where tbl is on the rhs of the left outer join -// When a predicate uses information from an outer table, we can convert from an outer join to an inner join -// if the predicate is "null-intolerant". -// -// Null-intolerant in this context means that the predicate will not be true if the table columns are null. -// -// Since an outer join is an inner join with the addition of all the rows from the left-hand side that -// matched no rows on the right-hand, if we are later going to remove all the rows where the right-hand -// side did not match, we might as well turn the join into an inner join. -// -// This is based on the paper "Canonical Abstraction for Outerjoin Optimization" by J Rao et al -func canConvertToInner(ctx *plancontext.PlanningContext, expr sqlparser.Expr, rhs semantics.TableSet) bool { - isColNameFromRHS := func(e sqlparser.Expr) bool { - return sqlparser.IsColName(e) && ctx.SemTable.RecursiveDeps(e).IsSolvedBy(rhs) - } - switch expr := expr.(type) { - case *sqlparser.ComparisonExpr: - if expr.Operator == sqlparser.NullSafeEqualOp { - return false - } - - return isColNameFromRHS(expr.Left) || isColNameFromRHS(expr.Right) - - case *sqlparser.IsExpr: - if expr.Right != sqlparser.IsNotNullOp { - return false - } - - return isColNameFromRHS(expr.Left) - default: - return false - } -} diff --git a/go/vt/vtgate/planbuilder/operators/operator_test.go b/go/vt/vtgate/planbuilder/operators/operator_test.go index f2ef43bf19f..4ba5588f22e 100644 --- a/go/vt/vtgate/planbuilder/operators/operator_test.go +++ b/go/vt/vtgate/planbuilder/operators/operator_test.go @@ -105,7 +105,7 @@ func TestOperator(t *testing.T) { ctx := plancontext.NewPlanningContext(nil, semTable, nil, 0) optree, err := createLogicalOperatorFromAST(ctx, stmt) require.NoError(t, err) - optree, err = compact(ctx, optree) + optree, err = Compact(ctx, optree) require.NoError(t, err) output := testString(optree) assert.Equal(t, tc.expected, output) diff --git a/go/vt/vtgate/planbuilder/operators/ops/op.go b/go/vt/vtgate/planbuilder/operators/ops/op.go new file mode 100644 index 00000000000..4deeb5cee1e --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/ops/op.go @@ -0,0 +1,50 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ops + +import ( + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" +) + +type ( + // Operator forms the tree of operators, representing the declarative query provided. + // While planning, the operator tree starts with logical operators, and later moves to physical operators. + // The difference between the two is that when we get to a physical operator, we have made decisions on in + // which order to do the joins, and how to split them up across shards and keyspaces. + // In some situation we go straight to the physical operator - when there are no options to consider, + // we can go straight to the end result. + Operator interface { + Clone(inputs []Operator) Operator + Inputs() []Operator + + // AddPredicate is used to push predicates. It pushed it as far down as is possible in the tree. + // If we encounter a join and the predicate depends on both sides of the join, the predicate will be split into two parts, + // where data is fetched from the LHS of the join to be used in the evaluation on the RHS + AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) + + // AddColumn tells an operator to also output an additional column specified. + // The offset to the column is returned. + AddColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (int, error) + } + + // PhysicalOperator means that this operator is ready to be turned into a logical plan + PhysicalOperator interface { + Operator + IPhysical() + } +) diff --git a/go/vt/vtgate/planbuilder/operators/querygraph.go b/go/vt/vtgate/planbuilder/operators/querygraph.go index 91a82b335c2..a0adf97dad7 100644 --- a/go/vt/vtgate/planbuilder/operators/querygraph.go +++ b/go/vt/vtgate/planbuilder/operators/querygraph.go @@ -18,6 +18,7 @@ package operators import ( "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/semantics" ) @@ -60,9 +61,9 @@ type ( } ) -var _ Operator = (*QueryGraph)(nil) +var _ ops.Operator = (*QueryGraph)(nil) -// Introduces implements the tableIDIntroducer interface +// Introduces implements the TableIDIntroducer interface func (qg *QueryGraph) Introduces() semantics.TableSet { var ts semantics.TableSet for _, table := range qg.Tables { @@ -174,9 +175,8 @@ func (qg *QueryGraph) UnsolvedPredicates(_ *semantics.SemTable) []sqlparser.Expr return result } -// clone implements the Operator interface -func (qg *QueryGraph) clone(inputs []Operator) Operator { - checkSize(inputs, 0) +// Clone implements the Operator interface +func (qg *QueryGraph) Clone(inputs []ops.Operator) ops.Operator { result := &QueryGraph{ Tables: nil, innerJoins: nil, @@ -189,7 +189,7 @@ func (qg *QueryGraph) clone(inputs []Operator) Operator { return result } -func (qg *QueryGraph) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { +func (qg *QueryGraph) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { for _, e := range sqlparser.SplitAndExpression(nil, expr) { err := qg.collectPredicate(ctx, e) if err != nil { diff --git a/go/vt/vtgate/planbuilder/operators/rewrite/rewriters.go b/go/vt/vtgate/planbuilder/operators/rewrite/rewriters.go new file mode 100644 index 00000000000..42839c58deb --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/rewrite/rewriters.go @@ -0,0 +1,128 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rewrite + +import ( + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" +) + +type ( + Func func(ops.Operator) (ops.Operator, TreeIdentity, error) + BreakableFunc func(ops.Operator) (ops.Operator, TreeIdentity, VisitRule, error) + + // TreeIdentity tracks modifications to node and expression trees. + // Only return SameTree when it is acceptable to return the original + // input and discard the returned result as a performance improvement. + TreeIdentity bool + + // VisitRule signals to the rewriter if the children of this operator should be visited or not + VisitRule bool +) + +const ( + SameTree TreeIdentity = false + NewTree TreeIdentity = true + + VisitChildren VisitRule = true + SkipChildren VisitRule = false +) + +// Visit allows for the walking of the operator tree. If any error is returned, the walk is aborted +func Visit(root ops.Operator, visitor func(ops.Operator) error) error { + _, err := TopDown(root, func(op ops.Operator) (ops.Operator, TreeIdentity, VisitRule, error) { + err := visitor(op) + if err != nil { + return nil, SameTree, SkipChildren, err + } + return op, SameTree, VisitChildren, nil + }) + return err +} + +// BottomUp rewrites an operator tree from the bottom up. BottomUp applies a transformation function to +// the given operator tree from the bottom up. Each callback [f] returns a TreeIdentity that is aggregated +// into a final output indicating whether the operator tree was changed. +func BottomUp(root ops.Operator, f Func) (ops.Operator, error) { + op, _, err := bottomUp(root, f) + if err != nil { + return nil, err + } + return op, nil +} + +// TopDown applies a transformation function to the given operator tree from the bottom up. = +// Each callback [f] returns a TreeIdentity that is aggregated into a final output indicating whether the +// operator tree was changed. +// The callback also returns a VisitRule that signals whether the children of this operator should be visited or not +func TopDown(in ops.Operator, rewriter BreakableFunc) (ops.Operator, error) { + op, _, err := breakableTopDown(in, rewriter) + return op, err +} + +func bottomUp(root ops.Operator, rewriter Func) (ops.Operator, TreeIdentity, error) { + oldInputs := root.Inputs() + anythingChanged := false + newInputs := make([]ops.Operator, len(oldInputs)) + for i, operator := range oldInputs { + in, changed, err := bottomUp(operator, rewriter) + if err != nil { + return nil, SameTree, err + } + if changed == NewTree { + anythingChanged = true + } + newInputs[i] = in + } + + if anythingChanged { + root = root.Clone(newInputs) + } + + newOp, treeIdentity, err := rewriter(root) + if err != nil { + return nil, SameTree, err + } + if anythingChanged { + treeIdentity = NewTree + } + return newOp, treeIdentity, nil +} + +func breakableTopDown(in ops.Operator, rewriter BreakableFunc) (ops.Operator, TreeIdentity, error) { + newOp, identity, visit, err := rewriter(in) + if err != nil || visit == SkipChildren { + return newOp, identity, err + } + + anythingChanged := identity == NewTree + + oldInputs := newOp.Inputs() + newInputs := make([]ops.Operator, len(oldInputs)) + for i, oldInput := range oldInputs { + newInputs[i], identity, err = breakableTopDown(oldInput, rewriter) + anythingChanged = anythingChanged || identity == NewTree + if err != nil { + return nil, SameTree, err + } + } + + if anythingChanged { + return newOp.Clone(newInputs), NewTree, nil + } + + return newOp, SameTree, nil +} diff --git a/go/vt/vtgate/planbuilder/operators/route.go b/go/vt/vtgate/planbuilder/operators/route.go index 981b974b2a8..2015b71f2cb 100644 --- a/go/vt/vtgate/planbuilder/operators/route.go +++ b/go/vt/vtgate/planbuilder/operators/route.go @@ -24,6 +24,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/semantics" @@ -32,7 +33,7 @@ import ( type ( Route struct { - Source Operator + Source ops.Operator RouteOpCode engine.Opcode Keyspace *vindexes.Keyspace @@ -86,7 +87,7 @@ type ( } ) -var _ PhysicalOperator = (*Route)(nil) +var _ ops.PhysicalOperator = (*Route)(nil) // IPhysical implements the PhysicalOperator interface func (*Route) IPhysical() {} @@ -116,9 +117,8 @@ func (r *Route) Cost() int { return 1 } -// clone implements the Operator interface -func (r *Route) clone(inputs []Operator) Operator { - checkSize(inputs, 1) +// Clone implements the Operator interface +func (r *Route) Clone(inputs []ops.Operator) ops.Operator { cloneRoute := *r cloneRoute.Source = inputs[0] cloneRoute.VindexPreds = make([]*VindexPlusPredicates, len(r.VindexPreds)) @@ -130,9 +130,9 @@ func (r *Route) clone(inputs []Operator) Operator { return &cloneRoute } -// inputs implements the Operator interface -func (r *Route) inputs() []Operator { - return []Operator{r.Source} +// Inputs implements the Operator interface +func (r *Route) Inputs() []ops.Operator { + return []ops.Operator{r.Source} } func (r *Route) UpdateRoutingLogic(ctx *plancontext.PlanningContext, expr sqlparser.Expr) error { @@ -805,7 +805,7 @@ func createRoute(ctx *plancontext.PlanningContext, table *QueryTable, solves sem return plan, nil } -func (r *Route) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { +func (r *Route) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { err := r.UpdateRoutingLogic(ctx, expr) if err != nil { return nil, err diff --git a/go/vt/vtgate/planbuilder/operators/route_planning.go b/go/vt/vtgate/planbuilder/operators/route_planning.go index 48ddd585dbb..e121c8b8fee 100644 --- a/go/vt/vtgate/planbuilder/operators/route_planning.go +++ b/go/vt/vtgate/planbuilder/operators/route_planning.go @@ -21,6 +21,10 @@ import ( "fmt" "io" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" + + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" @@ -40,15 +44,15 @@ type ( left, right semantics.TableSet } - opCacheMap map[tableSetPair]Operator + opCacheMap map[tableSetPair]ops.Operator ) // TransformToPhysical takes an operator tree and rewrites any parts that have not yet been planned as physical operators. // This is where a lot of the optimisations of the query plans are done. // Here we try to merge query parts into the same route primitives. At the end of this process, // all the operators in the tree are guaranteed to be PhysicalOperators -func transformToPhysical(ctx *plancontext.PlanningContext, in Operator) (Operator, error) { - op, _, err := rewriteBottomUp(ctx, in, func(context *plancontext.PlanningContext, operator Operator) (newOp Operator, changed bool, err error) { +func transformToPhysical(ctx *plancontext.PlanningContext, in ops.Operator) (ops.Operator, error) { + op, err := rewrite.BottomUp(in, func(operator ops.Operator) (ops.Operator, rewrite.TreeIdentity, error) { switch op := operator.(type) { case *QueryGraph: return optimizeQueryGraph(ctx, op) @@ -61,7 +65,7 @@ func transformToPhysical(ctx *plancontext.PlanningContext, in Operator) (Operato case *Filter: return optimizeFilter(op) default: - return operator, false, nil + return operator, rewrite.SameTree, nil } }) @@ -69,8 +73,8 @@ func transformToPhysical(ctx *plancontext.PlanningContext, in Operator) (Operato return nil, err } - err = VisitTopDown(op, func(op Operator) error { - if _, isPhys := op.(PhysicalOperator); !isPhys { + err = rewrite.Visit(op, func(op ops.Operator) error { + if _, isPhys := op.(ops.PhysicalOperator); !isPhys { return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to transform %T to a physical operator", op) } return nil @@ -79,47 +83,47 @@ func transformToPhysical(ctx *plancontext.PlanningContext, in Operator) (Operato return nil, err } - return compact(ctx, op) + return Compact(ctx, op) } -func optimizeFilter(op *Filter) (Operator, bool, error) { +func optimizeFilter(op *Filter) (ops.Operator, rewrite.TreeIdentity, error) { if route, ok := op.Source.(*Route); ok { // let's push the filter into the route op.Source = route.Source route.Source = op - return route, true, nil + return route, rewrite.NewTree, nil } - return op, false, nil + return op, rewrite.SameTree, nil } -func optimizeDerived(ctx *plancontext.PlanningContext, op *Derived) (Operator, bool, error) { +func optimizeDerived(ctx *plancontext.PlanningContext, op *Derived) (ops.Operator, rewrite.TreeIdentity, error) { innerRoute, ok := op.Source.(*Route) if !ok { - return op, false, nil + return op, rewrite.SameTree, nil } if !(innerRoute.RouteOpCode == engine.EqualUnique) && !op.IsMergeable(ctx) { // no need to check anything if we are sure that we will only hit a single shard - return op, false, nil + return op, rewrite.SameTree, nil } op.Source = innerRoute.Source innerRoute.Source = op - return innerRoute, true, nil + return innerRoute, rewrite.NewTree, nil } -func optimizeJoin(ctx *plancontext.PlanningContext, op *Join) (Operator, bool, error) { +func optimizeJoin(ctx *plancontext.PlanningContext, op *Join) (ops.Operator, rewrite.TreeIdentity, error) { join, err := mergeOrJoin(ctx, op.LHS, op.RHS, sqlparser.SplitAndExpression(nil, op.Predicate), !op.LeftJoin) if err != nil { - return nil, false, err + return nil, rewrite.SameTree, err } - return join, true, nil + return join, rewrite.NewTree, nil } -func optimizeQueryGraph(ctx *plancontext.PlanningContext, op *QueryGraph) (result Operator, changed bool, err error) { - changed = true +func optimizeQueryGraph(ctx *plancontext.PlanningContext, op *QueryGraph) (result ops.Operator, changed rewrite.TreeIdentity, err error) { + changed = rewrite.NewTree switch { case ctx.PlannerVersion == querypb.ExecuteOptions_Gen4Left2Right: result, err = leftToRightSolve(ctx, op) @@ -131,7 +135,7 @@ func optimizeQueryGraph(ctx *plancontext.PlanningContext, op *QueryGraph) (resul if len(unresolved) > 0 { // if we have any predicates that none of the joins or tables took care of, // we add a single filter on top, so we don't lose it. This is used for sub-query planning - result = newFilter(result, unresolved...) + result = newFilter(result, sqlparser.AndExpressions(unresolved...)) } return @@ -210,7 +214,7 @@ func getUpdateVindexInformation( and removes the two inputs to this cheapest plan and instead adds the join. As an optimization, it first only considers joining tables that have predicates defined between them */ -func greedySolve(ctx *plancontext.PlanningContext, qg *QueryGraph) (Operator, error) { +func greedySolve(ctx *plancontext.PlanningContext, qg *QueryGraph) (ops.Operator, error) { routeOps, err := seedOperatorList(ctx, qg) planCache := opCacheMap{} if err != nil { @@ -224,13 +228,13 @@ func greedySolve(ctx *plancontext.PlanningContext, qg *QueryGraph) (Operator, er return op, nil } -func leftToRightSolve(ctx *plancontext.PlanningContext, qg *QueryGraph) (Operator, error) { +func leftToRightSolve(ctx *plancontext.PlanningContext, qg *QueryGraph) (ops.Operator, error) { plans, err := seedOperatorList(ctx, qg) if err != nil { return nil, err } - var acc Operator + var acc ops.Operator for _, plan := range plans { if acc == nil { acc = plan @@ -247,8 +251,8 @@ func leftToRightSolve(ctx *plancontext.PlanningContext, qg *QueryGraph) (Operato } // seedOperatorList returns a route for each table in the qg -func seedOperatorList(ctx *plancontext.PlanningContext, qg *QueryGraph) ([]Operator, error) { - plans := make([]Operator, len(qg.Tables)) +func seedOperatorList(ctx *plancontext.PlanningContext, qg *QueryGraph) ([]ops.Operator, error) { + plans := make([]ops.Operator, len(qg.Tables)) // we start by seeding the table with the single routes for i, table := range qg.Tables { @@ -338,7 +342,7 @@ func createInfSchemaRoute(ctx *plancontext.PlanningContext, table *QueryTable) ( if err != nil { return nil, err } - var src Operator = &Table{ + var src ops.Operator = &Table{ QTable: table, VTable: &vindexes.Table{ Name: table.Table.Name, @@ -372,7 +376,7 @@ func createInfSchemaRoute(ctx *plancontext.PlanningContext, table *QueryTable) ( return r, nil } -func mergeRoutes(ctx *plancontext.PlanningContext, qg *QueryGraph, physicalOps []Operator, planCache opCacheMap, crossJoinsOK bool) (Operator, error) { +func mergeRoutes(ctx *plancontext.PlanningContext, qg *QueryGraph, physicalOps []ops.Operator, planCache opCacheMap, crossJoinsOK bool) (ops.Operator, error) { if len(physicalOps) == 0 { return nil, nil } @@ -405,17 +409,17 @@ func mergeRoutes(ctx *plancontext.PlanningContext, qg *QueryGraph, physicalOps [ return physicalOps[0], nil } -func removeAt(plans []Operator, idx int) []Operator { +func removeAt(plans []ops.Operator, idx int) []ops.Operator { return append(plans[:idx], plans[idx+1:]...) } func findBestJoin( ctx *plancontext.PlanningContext, qg *QueryGraph, - plans []Operator, + plans []ops.Operator, planCache opCacheMap, crossJoinsOK bool, -) (bestPlan Operator, lIdx int, rIdx int, err error) { +) (bestPlan ops.Operator, lIdx int, rIdx int, err error) { for i, lhs := range plans { for j, rhs := range plans { if i == j { @@ -443,7 +447,7 @@ func findBestJoin( return bestPlan, lIdx, rIdx, nil } -func getJoinFor(ctx *plancontext.PlanningContext, cm opCacheMap, lhs, rhs Operator, joinPredicates []sqlparser.Expr) (Operator, error) { +func getJoinFor(ctx *plancontext.PlanningContext, cm opCacheMap, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr) (ops.Operator, error) { solves := tableSetPair{left: TableID(lhs), right: TableID(rhs)} cachedPlan := cm[solves] if cachedPlan != nil { @@ -460,10 +464,10 @@ func getJoinFor(ctx *plancontext.PlanningContext, cm opCacheMap, lhs, rhs Operat // requiresSwitchingSides will return true if any of the operators with the root from the given operator tree // is of the type that should not be on the RHS of a join -func requiresSwitchingSides(ctx *plancontext.PlanningContext, op Operator) bool { +func requiresSwitchingSides(ctx *plancontext.PlanningContext, op ops.Operator) bool { required := false - _ = VisitTopDown(op, func(current Operator) error { + _ = rewrite.Visit(op, func(current ops.Operator) error { derived, isDerived := current.(*Derived) if isDerived && !derived.IsMergeable(ctx) { @@ -477,7 +481,7 @@ func requiresSwitchingSides(ctx *plancontext.PlanningContext, op Operator) bool return required } -func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredicates []sqlparser.Expr, inner bool) (Operator, error) { +func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs ops.Operator, joinPredicates []sqlparser.Expr, inner bool) (ops.Operator, error) { merger := func(a, b *Route) (*Route, error) { return createRouteOperatorForJoin(a, b, joinPredicates, inner) } @@ -496,11 +500,11 @@ func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredic return nil, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: JOIN not supported between derived tables") } - join := NewApplyJoin(clone(rhs), clone(lhs), nil, !inner) + join := NewApplyJoin(Clone(rhs), Clone(lhs), nil, !inner) return pushJoinPredicates(ctx, joinPredicates, join) } - join := NewApplyJoin(clone(lhs), clone(rhs), nil, !inner) + join := NewApplyJoin(Clone(lhs), Clone(rhs), nil, !inner) return pushJoinPredicates(ctx, joinPredicates, join) } @@ -535,7 +539,7 @@ func createRouteOperatorForJoin(aRoute, bRoute *Route, joinPredicates []sqlparse type mergeFunc func(a, b *Route) (*Route, error) -func operatorsToRoutes(a, b Operator) (*Route, *Route) { +func operatorsToRoutes(a, b ops.Operator) (*Route, *Route) { aRoute, ok := a.(*Route) if !ok { return nil, nil @@ -549,11 +553,11 @@ func operatorsToRoutes(a, b Operator) (*Route, *Route) { func tryMerge( ctx *plancontext.PlanningContext, - a, b Operator, + a, b ops.Operator, joinPredicates []sqlparser.Expr, merger mergeFunc, -) (Operator, error) { - aRoute, bRoute := operatorsToRoutes(clone(a), clone(b)) +) (ops.Operator, error) { + aRoute, bRoute := operatorsToRoutes(Clone(a), Clone(b)) if aRoute == nil || bRoute == nil { return nil, nil } @@ -632,18 +636,18 @@ func isDualTable(route *Route) bool { return src.VTable.Name.String() == "dual" && src.QTable.Table.Qualifier.IsEmpty() } -func leaves(op Operator) (sources []Operator) { +func leaves(op ops.Operator) (sources []ops.Operator) { switch op := op.(type) { // these are the leaves case *Table: - return []Operator{op} + return []ops.Operator{op} // physical case *ApplyJoin: - return []Operator{op.LHS, op.RHS} + return []ops.Operator{op.LHS, op.RHS} case *Filter: - return []Operator{op.Source} + return []ops.Operator{op.Source} case *Route: - return []Operator{op.Source} + return []ops.Operator{op.Source} } panic(fmt.Sprintf("leaves unknown type: %T", op)) @@ -706,7 +710,7 @@ func canMergeOnFilter(ctx *plancontext.PlanningContext, a, b *Route, predicate s return rVindex == lVindex } -func findColumnVindex(ctx *plancontext.PlanningContext, a Operator, exp sqlparser.Expr) vindexes.SingleColumn { +func findColumnVindex(ctx *plancontext.PlanningContext, a ops.Operator, exp sqlparser.Expr) vindexes.SingleColumn { _, isCol := exp.(*sqlparser.ColName) if !isCol { return nil @@ -731,8 +735,8 @@ func findColumnVindex(ctx *plancontext.PlanningContext, a Operator, exp sqlparse deps := ctx.SemTable.RecursiveDeps(expr) - _ = VisitTopDown(a, func(rel Operator) error { - to, isTableOp := rel.(tableIDIntroducer) + _ = rewrite.Visit(a, func(rel ops.Operator) error { + to, isTableOp := rel.(TableIDIntroducer) if !isTableOp { return nil } @@ -890,13 +894,13 @@ func hexEqual(a, b *sqlparser.Literal) bool { return false } -func pushJoinPredicates(ctx *plancontext.PlanningContext, exprs []sqlparser.Expr, op *ApplyJoin) (Operator, error) { +func pushJoinPredicates(ctx *plancontext.PlanningContext, exprs []sqlparser.Expr, op *ApplyJoin) (ops.Operator, error) { if len(exprs) == 0 { return op, nil } for _, expr := range exprs { - _, err := addPredicate(op, ctx, expr, true) + _, err := AddPredicate(op, ctx, expr, true, newFilter) if err != nil { return nil, err } diff --git a/go/vt/vtgate/planbuilder/operators/subquery.go b/go/vt/vtgate/planbuilder/operators/subquery.go index 9813d94f858..e6dbb2f22ed 100644 --- a/go/vt/vtgate/planbuilder/operators/subquery.go +++ b/go/vt/vtgate/planbuilder/operators/subquery.go @@ -18,13 +18,14 @@ package operators import ( "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" ) type ( // SubQuery stores the information about subquery SubQuery struct { - Outer Operator + Outer ops.Operator Inner []*SubQueryInner noColumns @@ -36,7 +37,7 @@ type ( // Inner is the Operator inside the parenthesis of the subquery. // i.e: select (select 1 union select 1), the Inner here would be // of type Concatenate since we have a Union. - Inner Operator + Inner ops.Operator // ExtractedSubquery contains all information we need about this subquery ExtractedSubquery *sqlparser.ExtractedSubquery @@ -46,26 +47,24 @@ type ( } ) -var _ Operator = (*SubQuery)(nil) -var _ Operator = (*SubQueryInner)(nil) +var _ ops.Operator = (*SubQuery)(nil) +var _ ops.Operator = (*SubQueryInner)(nil) -// clone implements the Operator interface -func (s *SubQueryInner) clone(inputs []Operator) Operator { - checkSize(inputs, 1) +// Clone implements the Operator interface +func (s *SubQueryInner) Clone(inputs []ops.Operator) ops.Operator { return &SubQueryInner{ Inner: inputs[0], ExtractedSubquery: s.ExtractedSubquery, } } -// inputs implements the Operator interface -func (s *SubQueryInner) inputs() []Operator { - return []Operator{s.Inner} +// Inputs implements the Operator interface +func (s *SubQueryInner) Inputs() []ops.Operator { + return []ops.Operator{s.Inner} } -// clone implements the Operator interface -func (s *SubQuery) clone(inputs []Operator) Operator { - checkSize(inputs, len(s.Inner)+1) +// Clone implements the Operator interface +func (s *SubQuery) Clone(inputs []ops.Operator) ops.Operator { result := &SubQuery{ Outer: inputs[0], } @@ -79,9 +78,9 @@ func (s *SubQuery) clone(inputs []Operator) Operator { return result } -// inputs implements the Operator interface -func (s *SubQuery) inputs() []Operator { - operators := []Operator{s.Outer} +// Inputs implements the Operator interface +func (s *SubQuery) Inputs() []ops.Operator { + operators := []ops.Operator{s.Outer} for _, inner := range s.Inner { operators = append(operators, inner) } diff --git a/go/vt/vtgate/planbuilder/operators/subquery_planning.go b/go/vt/vtgate/planbuilder/operators/subquery_planning.go index 2b91dd12e9d..745004d195a 100644 --- a/go/vt/vtgate/planbuilder/operators/subquery_planning.go +++ b/go/vt/vtgate/planbuilder/operators/subquery_planning.go @@ -21,12 +21,14 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -func optimizeSubQuery(ctx *plancontext.PlanningContext, op *SubQuery) (Operator, bool, error) { +func optimizeSubQuery(ctx *plancontext.PlanningContext, op *SubQuery) (ops.Operator, rewrite.TreeIdentity, error) { var unmerged []*SubQueryOp // first loop over the subqueries and try to merge them into the outer plan @@ -46,7 +48,7 @@ func optimizeSubQuery(ctx *plancontext.PlanningContext, op *SubQuery) (Operator, } merged, err := tryMergeSubQueryOp(ctx, outer, innerOp, newInner, preds, merger) if err != nil { - return nil, false, err + return nil, rewrite.SameTree, err } if merged != nil { @@ -67,24 +69,24 @@ func optimizeSubQuery(ctx *plancontext.PlanningContext, op *SubQuery) (Operator, if inner.ExtractedSubquery.OpCode == int(engine.PulloutExists) { correlatedTree, err := createCorrelatedSubqueryOp(ctx, innerOp, outer, preds, inner.ExtractedSubquery) if err != nil { - return nil, false, err + return nil, rewrite.SameTree, err } outer = correlatedTree continue } - return nil, false, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard correlated subquery") + return nil, rewrite.SameTree, vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: cross-shard correlated subquery") } for _, tree := range unmerged { tree.Outer = outer outer = tree } - return outer, true, nil + return outer, rewrite.NewTree, nil } -func unresolvedAndSource(ctx *plancontext.PlanningContext, op Operator) ([]sqlparser.Expr, Operator) { - preds := unresolvedPredicates(op, ctx.SemTable) +func unresolvedAndSource(ctx *plancontext.PlanningContext, op ops.Operator) ([]sqlparser.Expr, ops.Operator) { + preds := UnresolvedPredicates(op, ctx.SemTable) if filter, ok := op.(*Filter); ok { if sqlparser.EqualsExprs(preds, filter.Predicates) { // if we are seeing a single filter with only these predicates, @@ -158,7 +160,7 @@ func mergeSubQueryOp(ctx *plancontext.PlanningContext, outer *Route, inner *Rout return outer, nil } -func isMergeable(ctx *plancontext.PlanningContext, query sqlparser.SelectStatement, op Operator) bool { +func isMergeable(ctx *plancontext.PlanningContext, query sqlparser.SelectStatement, op ops.Operator) bool { validVindex := func(expr sqlparser.Expr) bool { sc := findColumnVindex(ctx, op, expr) return sc != nil && sc.IsUnique() @@ -200,11 +202,11 @@ func isMergeable(ctx *plancontext.PlanningContext, query sqlparser.SelectStateme func tryMergeSubQueryOp( ctx *plancontext.PlanningContext, - outer, subq Operator, + outer, subq ops.Operator, subQueryInner *SubQueryInner, joinPredicates []sqlparser.Expr, merger mergeFunc, -) (Operator, error) { +) (ops.Operator, error) { switch outerOp := outer.(type) { case *Filter: op, err := tryMergeSubQueryOp(ctx, outerOp.Source, subq, subQueryInner, joinPredicates, merger) @@ -224,12 +226,12 @@ func tryMergeSubQueryOp( func tryMergeSubqueryWithRoute( ctx *plancontext.PlanningContext, - subq Operator, + subq ops.Operator, outerOp *Route, joinPredicates []sqlparser.Expr, merger mergeFunc, subQueryInner *SubQueryInner, -) (Operator, error) { +) (ops.Operator, error) { subqueryRoute, isRoute := subq.(*Route) if !isRoute { return nil, nil @@ -281,12 +283,12 @@ func tryMergeSubqueryWithRoute( func tryMergeSubqueryWithJoin( ctx *plancontext.PlanningContext, - subq Operator, + subq ops.Operator, outerOp *ApplyJoin, joinPredicates []sqlparser.Expr, merger mergeFunc, subQueryInner *SubQueryInner, -) (PhysicalOperator, error) { +) (ops.PhysicalOperator, error) { // Trying to merge the subquery with the left-hand or right-hand side of the join if outerOp.LeftJoin { @@ -336,10 +338,10 @@ func tryMergeSubqueryWithJoin( // the child of joinTree which does not contain the subquery is the otherTree func rewriteColumnsInSubqueryOpForJoin( ctx *plancontext.PlanningContext, - innerOp Operator, + innerOp ops.Operator, outerTree *ApplyJoin, subQueryInner *SubQueryInner, -) (Operator, error) { +) (ops.Operator, error) { resultInnerOp := innerOp var rewriteError error // go over the entire expression in the subquery @@ -384,7 +386,7 @@ func rewriteColumnsInSubqueryOpForJoin( func createCorrelatedSubqueryOp( ctx *plancontext.PlanningContext, - innerOp, outerOp Operator, + innerOp, outerOp ops.Operator, preds []sqlparser.Expr, extractedSubquery *sqlparser.ExtractedSubquery, ) (*CorrelatedSubQueryOp, error) { diff --git a/go/vt/vtgate/planbuilder/operators/table.go b/go/vt/vtgate/planbuilder/operators/table.go index dca2a76d85a..04a23d4e3e1 100644 --- a/go/vt/vtgate/planbuilder/operators/table.go +++ b/go/vt/vtgate/planbuilder/operators/table.go @@ -20,6 +20,7 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/semantics" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -39,14 +40,13 @@ type ( } ) -var _ PhysicalOperator = (*Table)(nil) +var _ ops.PhysicalOperator = (*Table)(nil) // IPhysical implements the PhysicalOperator interface func (to *Table) IPhysical() {} -// clone implements the Operator interface -func (to *Table) clone(inputs []Operator) Operator { - checkSize(inputs, 0) +// Clone implements the Operator interface +func (to *Table) Clone([]ops.Operator) ops.Operator { var columns []*sqlparser.ColName for _, name := range to.Columns { columns = append(columns, sqlparser.CloneRefOfColName(name)) @@ -64,7 +64,7 @@ func (to *Table) Introduces() semantics.TableSet { } // AddPredicate implements the PhysicalOperator interface -func (to *Table) AddPredicate(_ *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { +func (to *Table) AddPredicate(_ *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { return newFilter(to, expr), nil } diff --git a/go/vt/vtgate/planbuilder/operators/union.go b/go/vt/vtgate/planbuilder/operators/union.go index 23f2827440d..33862f2f467 100644 --- a/go/vt/vtgate/planbuilder/operators/union.go +++ b/go/vt/vtgate/planbuilder/operators/union.go @@ -20,11 +20,13 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" ) type Union struct { - Sources []Operator + Sources []ops.Operator Distinct bool // TODO this should be removed. For now it's used to fail queries @@ -33,21 +35,20 @@ type Union struct { noColumns } -var _ PhysicalOperator = (*Union)(nil) +var _ ops.PhysicalOperator = (*Union)(nil) // IPhysical implements the PhysicalOperator interface func (u *Union) IPhysical() {} -// clone implements the Operator interface -func (u *Union) clone(inputs []Operator) Operator { +// Clone implements the Operator interface +func (u *Union) Clone(inputs []ops.Operator) ops.Operator { newOp := *u - checkSize(inputs, len(u.Sources)) newOp.Sources = inputs return &newOp } -// inputs implements the Operator interface -func (u *Union) inputs() []Operator { +// Inputs implements the Operator interface +func (u *Union) Inputs() []ops.Operator { return u.Sources } @@ -72,7 +73,7 @@ Notice how `X.col = 42` has been translated to `foo = 42` and `id = 42` on respe The first SELECT of the union dictates the column names, and the second is whatever expression can be found on the same offset. The names of the RHS are discarded. */ -func (u *Union) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { +func (u *Union) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { offsets := make(map[string]int) sel, err := u.GetSelectFor(0) if err != nil { @@ -135,9 +136,54 @@ func (u *Union) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Ex } func (u *Union) GetSelectFor(source int) (*sqlparser.Select, error) { - horizon, ok := u.Sources[source].(*Horizon) - if !ok { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected all sources of the UNION to be horizons") + src := u.Sources[source] + for { + switch op := src.(type) { + case *Horizon: + return sqlparser.GetFirstSelect(op.Select), nil + case *Route: + src = op.Source + default: + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "expected all sources of the UNION to be horizons") + } } - return sqlparser.GetFirstSelect(horizon.Select), nil +} + +func (u *Union) Compact(*plancontext.PlanningContext) (ops.Operator, rewrite.TreeIdentity, error) { + var newSources []ops.Operator + anythingChanged := false + for _, source := range u.Sources { + var other *Union + horizon, ok := source.(*Horizon) + if ok { + union, ok := horizon.Source.(*Union) + if ok { + other = union + } + } + if other == nil { + newSources = append(newSources, source) + continue + } + anythingChanged = true + switch { + case len(other.Ordering) == 0 && !other.Distinct: + fallthrough + case u.Distinct: + // if the current UNION is a DISTINCT, we can safely ignore everything from children UNIONs, except LIMIT + newSources = append(newSources, other.Sources...) + + default: + newSources = append(newSources, other) + } + } + if anythingChanged { + u.Sources = newSources + } + identity := rewrite.SameTree + if anythingChanged { + identity = rewrite.NewTree + } + + return u, identity, nil } diff --git a/go/vt/vtgate/planbuilder/operators/update.go b/go/vt/vtgate/planbuilder/operators/update.go index d48fab66675..6d3c481bc94 100644 --- a/go/vt/vtgate/planbuilder/operators/update.go +++ b/go/vt/vtgate/planbuilder/operators/update.go @@ -19,6 +19,7 @@ package operators import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/semantics" "vitess.io/vitess/go/vt/vtgate/vindexes" ) @@ -36,7 +37,7 @@ type Update struct { noPredicates } -var _ PhysicalOperator = (*Update)(nil) +var _ ops.PhysicalOperator = (*Update)(nil) // Introduces implements the PhysicalOperator interface func (u *Update) Introduces() semantics.TableSet { @@ -46,9 +47,8 @@ func (u *Update) Introduces() semantics.TableSet { // IPhysical implements the PhysicalOperator interface func (u *Update) IPhysical() {} -// clone implements the Operator interface -func (u *Update) clone(inputs []Operator) Operator { - checkSize(inputs, 0) +// Clone implements the Operator interface +func (u *Update) Clone(inputs []ops.Operator) ops.Operator { return &Update{ QTable: u.QTable, VTable: u.VTable, diff --git a/go/vt/vtgate/planbuilder/operators/vindex.go b/go/vt/vtgate/planbuilder/operators/vindex.go index 02e7a0bf0cc..633ce3123fc 100644 --- a/go/vt/vtgate/planbuilder/operators/vindex.go +++ b/go/vt/vtgate/planbuilder/operators/vindex.go @@ -21,6 +21,7 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" "vitess.io/vitess/go/vt/vtgate/semantics" "vitess.io/vitess/go/vt/vtgate/vindexes" @@ -58,14 +59,13 @@ func (v *Vindex) Introduces() semantics.TableSet { // IPhysical implements the PhysicalOperator interface func (v *Vindex) IPhysical() {} -// clone implements the Operator interface -func (v *Vindex) clone(inputs []Operator) Operator { - checkSize(inputs, 0) +// Clone implements the Operator interface +func (v *Vindex) Clone([]ops.Operator) ops.Operator { clone := *v return &clone } -var _ PhysicalOperator = (*Vindex)(nil) +var _ ops.PhysicalOperator = (*Vindex)(nil) func (v *Vindex) AddColumn(_ *plancontext.PlanningContext, expr sqlparser.Expr) (int, error) { return addColumn(v, expr) @@ -79,7 +79,7 @@ func (v *Vindex) AddCol(col *sqlparser.ColName) { } // checkValid implements the Operator interface -func (v *Vindex) checkValid() error { +func (v *Vindex) CheckValid() error { if len(v.Table.Predicates) == 0 { return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "unsupported: where clause for vindex function must be of the form id = or id in(,...) (where clause missing)") } @@ -87,7 +87,7 @@ func (v *Vindex) checkValid() error { return nil } -func (v *Vindex) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (Operator, error) { +func (v *Vindex) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (ops.Operator, error) { for _, e := range sqlparser.SplitAndExpression(nil, expr) { deps := ctx.SemTable.RecursiveDeps(e) if deps.NumberOfTables() > 1 {