diff --git a/go/vt/vtgate/planbuilder/operators/README.md b/go/vt/vtgate/planbuilder/operators/README.md new file mode 100644 index 00000000000..d2415473aaf --- /dev/null +++ b/go/vt/vtgate/planbuilder/operators/README.md @@ -0,0 +1,253 @@ +# Vitess Query Planning - Operators Package + +The operators package is the heart of Vitess query planning. It transforms SQL queries into an optimized execution plan for distributed MySQL databases, with the primary goal of **pushing as much work as possible down to MySQL shards** rather than processing data in the Vitess proxy layer. + +## Core Philosophy + +The fundamental principle is simple: **anything pushed under a Route will be turned into SQL and sent to MySQL**. Operations that remain above Routes must be executed in the proxy layer, which is significantly slower. Therefore, the planner aggressively tries to merge and push operations down under Routes. + +## Planning Process Overview + +``` +SQL AST → Initial Operator Tree → Phases & Rewriting → Offset Planning → Final Plan +``` + +### 1. Initial Plan Construction +- Build operator tree from parsed SQL +- Create **QueryGraphs** at leaves (inner-joined tables) +- Wrap complex operations in **Horizon** operators +- Establish the foundation for optimization + +### 2. Phased Planning & Rewriting +- Run multiple optimization phases +- Use fixed-point rewriting until no more changes occur +- Each phase targets specific query patterns + +### 3. Offset Planning +- Transform AST expressions to offsets or evalengine expressions +- Prepare for execution + +## Key Operator Types + +### Route +**The most important operator** - represents work sent to MySQL shards. +- Contains SQL to execute on one or more shards +- Different routing strategies (single shard, scattered, etc.) +- Goal: Push as many operations under Routes as possible + +### Horizon +A container for post-join operations that we hope to push down: +- SELECT expressions +- GROUP BY and aggregation +- ORDER BY +- LIMIT/OFFSET +- If pushable → goes under Route, otherwise expands into individual operators + +### QueryGraph (QG) +- Represents tables connected by inner joins +- Allows flexible join reordering +- Gets planned into optimized join sequences +- Eventually becomes Route(s) or ApplyJoin(s) + +### Join vs ApplyJoin +- **Join**: Logical operator for tracking outer joins +- **ApplyJoin**: Physical join algorithm (nested loop) +- **HashJoin**: Alternative physical join algorithm + +### Core Operation Operators +When Horizons can't be pushed down, they expand into: +- **Aggregator**: GROUP BY and aggregate functions +- **Projection**: SELECT expression evaluation +- **Filter**: WHERE clause processing +- **Ordering**: ORDER BY sorting +- **Limit**: LIMIT/OFFSET processing +- **Distinct**: DISTINCT operation + +## Planning Phases + +The planner runs through several phases, each targeting specific optimizations: + +```go +const ( + physicalTransform // Basic setup + initialPlanning // Initial horizon planning optimization + pullDistinctFromUnion // Pull distinct from UNION + delegateAggregation // Split aggregation between vtgate and mysql + recursiveCTEHorizons // Expand recursive CTE horizons + addAggrOrdering // Optimize aggregations with ORDER BY + cleanOutPerfDistinct // Optimize Distinct operations + subquerySettling // Settle subqueries + dmlWithInput // Expand update/delete to dml with input +) +``` + +Each phase only runs if relevant to the query (e.g., aggregation phases skip if no GROUP BY). + +## Routing Strategies + +Different routing types handle various distribution patterns: + +- **ShardedRouting**: Complex routing based on vindex predicates +- **AnyShardRouting**: Reference tables available on any shard +- **DualRouting**: MySQL's dual table (single row, no real table) +- **InfoSchemaRouting**: Information schema queries +- **NoneRouting**: Known to return empty results, no need to actually run the query. +- **TargetedRouting**: Specific shard targeting + +## Push-Down Optimizations + +The rewriter system uses a visitor pattern to optimize the tree: + +```go +func runRewriters(ctx *plancontext.PlanningContext, root Operator) Operator { + visitor := func(in Operator, _ semantics.TableSet, isRoot bool) (Operator, *ApplyResult) { + switch in := in.(type) { + case *Horizon: + return pushOrExpandHorizon(ctx, in) + case *ApplyJoin: + return tryMergeApplyJoin(in, ctx) + case *Projection: + return tryPushProjection(ctx, in) + case *Limit: + return tryPushLimit(ctx, in) + // ... more optimizations + } + } + return FixedPointBottomUp(root, TableID, visitor, stopAtRoute) +} +``` + +### Key Optimizations + +**Horizon Push/Expand**: +``` +If possible: Horizon → Route (push entire horizon to MySQL) +Otherwise: Horizon → Aggregator + Ordering + Limit + ... (expand to individual ops) +``` + +**ApplyJoin Merging**: +``` +ApplyJoin(Route1, Route2) → Route(ApplyJoin(table1, table2)) +``` +Merges two routes into one, pushing the join to MySQL. + +**Limit Optimization**: +- Single shard: Push entire LIMIT down +- Multi-shard: Push LIMIT down but keep LIMIT on top for global limiting + +**Filter Push-Down**: +``` +Filter → Route (push WHERE conditions to MySQL) +``` + +## Multi-Shard Challenges + +When data spans multiple shards, certain operations require special handling: + +### LIMIT Example +```sql +SELECT * FROM users LIMIT 10 +``` + +**Single Shard**: `LIMIT 10` sent directly to MySQL +**Multi-Shard (4 shards)**: +- Send `LIMIT 10` to each shard (gets ≤40 rows total) +- Apply `LIMIT 10` in proxy layer (final 10 rows) + +### Aggregation Example +```sql +SELECT COUNT(*) FROM users GROUP BY country +``` + +**Pushable**: Single shard or sharded by `country` +**Split aggregation**: +- MySQL: `SELECT country, COUNT(*) FROM users GROUP BY country` +- Proxy: Sum the counts per country + +## Join Merging Logic + +The join merging system (`join_merging.go`) determines when two Routes can be merged: + +```go +switch { +case a == dual: // Dual can merge with single-shard routes +case a == anyShard && sameKeyspace: // Reference tables merge easily +case a == sharded && b == sharded: // Complex vindex-based merging +default: // Cannot merge +} +``` + +**Dual Routing Special Case**: +- Before merge: `:lhs_col = rhs.col` (parameter-based predicate) +- After merge: `lhs.col = rhs.col` (regular predicate) +- PredTracker handles this transformation + +## Horizon Expansion + +When a Horizon can't be pushed under a Route, it expands systematically: + +```go +// Original: Horizon(SELECT col1, COUNT(*) FROM t GROUP BY col1 ORDER BY col1 LIMIT 10) +// +// Expands to: +Limit(10, + Ordering(col1, + Aggregator(COUNT(*), GROUP BY col1, + Projection(col1, + Route(...))))) +``` + +## Error Handling & Fallbacks + +The planner includes sophisticated fallback mechanisms: + +- **Failed optimizations**: NoRewrite result, try alternative approaches +- **Unsupported features**: Fall back to proxy-layer processing +- **Complex queries**: Expand Horizons into manageable components + +## Debugging Support + +Set `DebugOperatorTree = true` to see the operator tree at each phase: + +``` +PHASE: initial horizon planning optimization +Route(SelectEqual(1) sharded) + └─ Horizon(SELECT id, name FROM users WHERE id = :id) + +PHASE: delegateAggregation +Route(SelectEqual(1) sharded) + └─ Aggregator(COUNT(*)) + └─ QueryGraph(users) +``` + +## Performance Considerations + +**Critical Path**: Every operation above Routes adds latency and resource usage in the proxy layer. + +**Memory Usage**: Large result sets processed in proxy require significant memory. + +**Network Traffic**: Multiple round-trips between proxy and MySQL for complex operations. + +**Optimization Priority**: +1. Push entire queries to single Routes +2. Merge multiple Routes where possible +3. Push individual operations as far down as possible +4. Minimize proxy-layer processing + +## Usage Example + +```go +// Build initial operator tree from AST +op := createOperatorFromAST(ctx, stmt) + +// Run planning phases +optimized := runPhases(ctx, op) + +// Add offset planning +final := planOffsets(ctx, optimized) + +// Convert to execution engine +primitive := convertToPrimitive(final) +``` + +The result is an optimized execution plan that maximizes MySQL utilization while minimizing proxy-layer overhead, enabling Vitess to efficiently handle complex queries across distributed MySQL clusters. \ No newline at end of file diff --git a/go/vt/vtgate/planbuilder/operators/join_merging.go b/go/vt/vtgate/planbuilder/operators/join_merging.go index af825f0c0df..e557ebfee62 100644 --- a/go/vt/vtgate/planbuilder/operators/join_merging.go +++ b/go/vt/vtgate/planbuilder/operators/join_merging.go @@ -31,6 +31,7 @@ import ( func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs Operator) *Route { lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(ctx, lhs, rhs) if lhsRoute == nil { + debugNoRewrite("apply join merge blocked: LHS or RHS is not a Route") return nil } @@ -48,6 +49,7 @@ func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs } if !(jm.joinType.IsInner() || newRouting.OpCode().IsSingleShard()) { + debugNoRewrite("apply join merge blocked: dual routing with non-inner join type %s and multi-shard routing %s", jm.joinType.ToString(), newRouting.OpCode().String()) return nil } return jm.merge(ctx, lhsRoute, rhsRoute, newRouting) @@ -75,13 +77,27 @@ func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs // infoSchema routing is complex, so we handle it in a separate method case a == infoSchema && b == infoSchema: - return tryMergeInfoSchemaRoutings(ctx, routingA, routingB, jm, lhsRoute, rhsRoute) + result := tryMergeInfoSchemaRoutings(ctx, routingA, routingB, jm, lhsRoute, rhsRoute) + if result == nil { + debugNoRewrite("apply join merge blocked: info schema routing merge failed") + } + return result // sharded routing is complex, so we handle it in a separate method case a == sharded && b == sharded: - return tryMergeShardedRouting(ctx, lhsRoute, rhsRoute, jm, jm.predicates) + result := tryMergeShardedRouting(ctx, lhsRoute, rhsRoute, jm, jm.predicates) + if result == nil { + debugNoRewrite("apply join merge blocked: sharded routing merge failed (different keyspaces or incompatible vindex predicates)") + } + return result default: + if !sameKeyspace { + debugNoRewrite("apply join merge blocked: routes target different keyspaces (LHS: %s %s, RHS: %s %s)", + a.String(), getKeyspaceName(routingA), b.String(), getKeyspaceName(routingB)) + } else { + debugNoRewrite("apply join merge blocked: incompatible routing types (LHS: %s, RHS: %s)", a.String(), b.String()) + } return nil } } @@ -238,6 +254,13 @@ func mergeShardedRouting(r1 *ShardedRouting, r2 *ShardedRouting) *ShardedRouting return tr } +func getKeyspaceName(routing Routing) string { + if ks := routing.Keyspace(); ks != nil { + return ks.Name + } + return "" +} + func (jm *joinMerger) merge(ctx *plancontext.PlanningContext, op1, op2 *Route, r Routing, conditions ...engine.Condition) *Route { aj := NewApplyJoin(ctx, op1.Source, op2.Source, ctx.SemTable.AndExpressions(jm.predicates...), jm.joinType, false) for _, column := range aj.JoinPredicates.columns { diff --git a/go/vt/vtgate/planbuilder/operators/query_planning.go b/go/vt/vtgate/planbuilder/operators/query_planning.go index b2aba1caf6e..f39b4596d4d 100644 --- a/go/vt/vtgate/planbuilder/operators/query_planning.go +++ b/go/vt/vtgate/planbuilder/operators/query_planning.go @@ -131,6 +131,7 @@ func tryMergeApplyJoin(in *ApplyJoin, ctx *plancontext.PlanningContext) (_ Opera jm := newJoinMerge(preds, in.JoinType) r := jm.mergeJoinInputs(ctx, in.LHS, in.RHS) if r == nil { + // Specific failure reason already logged by mergeJoinInputs return in, NoRewrite } aj, ok := r.Source.(*ApplyJoin) @@ -177,6 +178,7 @@ func tryMergeApplyJoin(in *ApplyJoin, ctx *plancontext.PlanningContext) (_ Opera success := "pushed ApplyJoin under Route" if !ok { // Unexpected scenario: LHS is not a Route; abort rewrite. + debugNoRewrite("apply join merge blocked: LHS is not a Route") return in, NoRewrite } @@ -186,6 +188,7 @@ func tryMergeApplyJoin(in *ApplyJoin, ctx *plancontext.PlanningContext) (_ Opera !(jm.joinType.IsInner() || r.Routing.OpCode().IsSingleShard()) { // to check the resulting opcode, we've used the original predicates. // Since we are not using them, we need to restore the argument versions of the predicates + debugNoRewrite("apply join merge blocked: dual routing with non-inner join and multi-shard target") return in, NoRewrite } @@ -196,6 +199,7 @@ func tryPushDelete(in *Delete) (Operator, *ApplyResult) { if src, ok := in.Source.(*Route); ok { return pushDMLUnderRoute(in, src, "pushed delete under route") } + debugNoRewrite("delete push blocked: source is not a Route") return in, NoRewrite } @@ -203,6 +207,7 @@ func tryPushUpdate(in *Update) (Operator, *ApplyResult) { if src, ok := in.Source.(*Route); ok { return pushDMLUnderRoute(in, src, "pushed update under route") } + debugNoRewrite("update push blocked: source is not a Route") return in, NoRewrite } @@ -256,6 +261,7 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operato } if !reachedPhase(ctx, initialPlanning) { + debugNoRewrite("horizon push blocked: not reached initialPlanning phase") return in, NoRewrite } @@ -286,6 +292,21 @@ func pushOrExpandHorizon(ctx *plancontext.PlanningContext, in *Horizon) (Operato return Swap(in, rb, "push horizon into route") } + // Debug why we can't push the horizon + if !isRoute { + debugNoRewrite("horizon push blocked: source is not a Route") + } else if hasHaving { + debugNoRewrite("horizon push blocked: query has HAVING clause") + } else if needsOrdering { + debugNoRewrite("horizon push blocked: query has ORDER BY") + } else if qp.NeedsAggregation() { + debugNoRewrite("horizon push blocked: query needs aggregation") + } else if isDistinctAST(in.selectStatement()) { + debugNoRewrite("horizon push blocked: query has DISTINCT") + } else if in.selectStatement().GetLimit() != nil { + debugNoRewrite("horizon push blocked: query has LIMIT") + } + return expandHorizon(ctx, in) } @@ -294,10 +315,12 @@ func tryPushLimit(ctx *plancontext.PlanningContext, in *Limit) (Operator, *Apply case *Route: return tryPushingDownLimitInRoute(ctx, in, src) case *Aggregator: + debugNoRewrite("limit push blocked: cannot push limit under aggregator") return in, NoRewrite case *ApplyJoin: if in.Pushed { // This is the Top limit, and it's already pushed down + debugNoRewrite("limit push blocked: limit already pushed down") return in, NoRewrite } side := "RHS" @@ -318,6 +341,7 @@ func tryPushLimit(ctx *plancontext.PlanningContext, in *Limit) (Operator, *Apply case *Limit: combinedLimit := mergeLimits(in.AST, src.AST) if combinedLimit == nil { + debugNoRewrite("limit push blocked: cannot merge limits") break } // we can remove the other LIMIT @@ -530,6 +554,7 @@ func tryPushingDownLimitInRoute(ctx *plancontext.PlanningContext, in *Limit, src // this limit has already been pushed down, nothing to do here if in.Pushed { + debugNoRewrite("limit push blocked: limit already pushed down in route") return in, NoRewrite } @@ -545,6 +570,7 @@ func tryPushingDownLimitInRoute(ctx *plancontext.PlanningContext, in *Limit, src func setUpperLimit(in *Limit) (Operator, *ApplyResult) { if in.Pushed { + debugNoRewrite("limit push blocked: upper limit already set") return in, NoRewrite } in.Pushed = true @@ -586,6 +612,7 @@ func tryPushOrdering(ctx *plancontext.PlanningContext, in *Ordering) (Operator, if canPushLeft(ctx, src, in.Order) { return pushOrderLeftOfJoin(src, in) } + debugNoRewrite("ordering push blocked: cannot push ordering to left side of apply join") case *Ordering: // we'll just remove the order underneath. The top order replaces whatever was incoming in.Source = src.Source @@ -594,6 +621,7 @@ func tryPushOrdering(ctx *plancontext.PlanningContext, in *Ordering) (Operator, return pushOrderingUnderProjection(ctx, in, src) case *Aggregator: if !src.QP.AlignGroupByAndOrderBy(ctx) && !overlaps(ctx, in.Order, src.Grouping) { + debugNoRewrite("ordering push blocked: GROUP BY and ORDER BY cannot be aligned and don't overlap") return in, NoRewrite } @@ -601,6 +629,7 @@ func tryPushOrdering(ctx *plancontext.PlanningContext, in *Ordering) (Operator, case *SubQueryContainer: return pushOrderingToOuterOfSubqueryContainer(ctx, in, src) } + debugNoRewrite("ordering push blocked: unsupported source operator type %T", in.Source) return in, NoRewrite } @@ -609,6 +638,7 @@ func pushOrderingToOuterOfSubqueryContainer(ctx *plancontext.PlanningContext, in for _, order := range in.Order { deps := ctx.SemTable.RecursiveDeps(order.Inner.Expr) if !deps.IsSolvedBy(outerTableID) { + debugNoRewrite("ordering push blocked: order expression depends on inner query tables") return in, NoRewrite } } @@ -620,15 +650,18 @@ func pushOrderingUnderProjection(ctx *plancontext.PlanningContext, in *Ordering, // we can move ordering under a projection if it's not introducing a column we're sorting by for _, by := range in.Order { if !mustFetchFromInput(ctx, by.SimplifiedExpr) { + debugNoRewrite("ordering push blocked: order expression introduces new column") return in, NoRewrite } } ap, ok := proj.Columns.(AliasedProjections) if !ok { + debugNoRewrite("ordering push blocked: projection columns not aliased") return in, NoRewrite } for _, projExpr := range ap { if projExpr.Info != nil { + debugNoRewrite("ordering push blocked: projection has expression with metadata") return in, NoRewrite } } @@ -766,6 +799,7 @@ func tryPushFilter(ctx *plancontext.PlanningContext, in *Filter) (Operator, *App for _, pred := range in.Predicates { deps := ctx.SemTable.RecursiveDeps(pred) if !deps.IsSolvedBy(outerTableID) { + debugNoRewrite("filter push blocked: predicate depends on inner subquery tables") return in, NoRewrite } } @@ -778,6 +812,7 @@ func tryPushFilter(ctx *plancontext.PlanningContext, in *Filter) (Operator, *App other, isFilter := in.Source.(*Filter) if !isFilter { + debugNoRewrite("filter push blocked: source is not a filter for merging") return in, NoRewrite } in.Source = other.Source @@ -785,6 +820,7 @@ func tryPushFilter(ctx *plancontext.PlanningContext, in *Filter) (Operator, *App return in, Rewrote("two filters merged into one") } + debugNoRewrite("filter push blocked: unsupported source operator type %T", in.Source) return in, NoRewrite } @@ -805,6 +841,7 @@ func pushFilterUnderProjection(ctx *plancontext.PlanningContext, filter *Filter, }, p) if cantPush { + debugNoRewrite("filter push blocked: predicate needs evaluation in projection") return filter, NoRewrite } } @@ -813,6 +850,7 @@ func pushFilterUnderProjection(ctx *plancontext.PlanningContext, filter *Filter, func tryPushDistinct(in *Distinct) (Operator, *ApplyResult) { if in.Required && in.PushedPerformance { + debugNoRewrite("distinct push blocked: distinct is required and performance optimization already applied") return in, NoRewrite } switch src := in.Source.(type) { @@ -825,6 +863,7 @@ func tryPushDistinct(in *Distinct) (Operator, *ApplyResult) { } if isDistinct(src.Source) { + debugNoRewrite("distinct push blocked: source already has distinct") return in, NoRewrite } @@ -858,6 +897,7 @@ func tryPushDistinct(in *Distinct) (Operator, *ApplyResult) { return in, Rewrote("remove ordering under distinct") } + debugNoRewrite("distinct push blocked: unsupported source operator type %T", in.Source) return in, NoRewrite } @@ -907,6 +947,7 @@ func tryPushUnion(ctx *plancontext.PlanningContext, op *Union) (Operator, *Apply } if len(sources) == len(op.Sources) { + debugNoRewrite("union push blocked: no sources could be merged") return op, NoRewrite } return newUnion(sources, selects, op.unionColumns, op.distinct), Rewrote("merge union inputs") diff --git a/go/vt/vtgate/planbuilder/operators/rewriters.go b/go/vt/vtgate/planbuilder/operators/rewriters.go index e7e131ada57..6d06a10dee2 100644 --- a/go/vt/vtgate/planbuilder/operators/rewriters.go +++ b/go/vt/vtgate/planbuilder/operators/rewriters.go @@ -64,6 +64,12 @@ func Rewrote(message string) *ApplyResult { return &ApplyResult{Transformations: []Rewrite{{Message: message}}} } +func debugNoRewrite(reason string, args ...interface{}) { + if DebugOperatorTree { + fmt.Printf("NoRewrite: "+reason+"\n", args...) + } +} + func (ar *ApplyResult) Merge(other *ApplyResult) *ApplyResult { if ar == nil { return other