From 090ae319f47d7c0b707fe21129bf85b59c5b9ff7 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Wed, 17 Dec 2025 15:03:46 +0000 Subject: [PATCH 1/4] Fix cross shard/keyspace joins with derived tables containing a `UNION`. Signed-off-by: Arthur Schreiber --- go/vt/vtgate/planbuilder/operators/union.go | 48 ++++++++++++- .../planbuilder/testdata/union_cases.json | 70 +++++++++++++++++++ 2 files changed, 116 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/union.go b/go/vt/vtgate/planbuilder/operators/union.go index f3d2b8e4041..506ff195a87 100644 --- a/go/vt/vtgate/planbuilder/operators/union.go +++ b/go/vt/vtgate/planbuilder/operators/union.go @@ -23,6 +23,7 @@ import ( "vitess.io/vitess/go/slice" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/planbuilder/operators/predicates" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" ) @@ -118,8 +119,26 @@ func (u *Union) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Ex func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int) (bool, []sqlparser.Expr) { needsFilter := false exprPerSource := make([]sqlparser.Expr, len(u.Sources)) + + // If the expression is a JoinPredicate, we need to extract the underlying expression + // before iterating over sources. This is because JoinPredicate.Clone() mutates shared + // tracker state during CopyOnRewrite, which would cause subsequent iterations to see + // the already-rewritten expression instead of the original. + baseExpr := expr + if jp, ok := expr.(*predicates.JoinPredicate); ok { + baseExpr = jp.Current() + } + for i := range u.Sources { - predicate := sqlparser.CopyOnRewrite(expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) { + // Clone the base expression for each source to ensure independent rewrites + exprToRewrite := sqlparser.Clone(baseExpr) + + // Check if this source can receive the raw underlying expression or needs the column alias. + // When a source is a Horizon wrapping another Union, we must use the column alias + // so that the inner Union can properly resolve the column reference. + useAlias := u.sourceNeedsAliasRewrite(i) + + predicate := sqlparser.CopyOnRewrite(exprToRewrite, nil, func(cursor *sqlparser.CopyOnWriteCursor) { col, ok := cursor.Node().(*sqlparser.ColName) if !ok { return @@ -137,7 +156,14 @@ func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int) if !ok { panic(vterrors.VT09015()) } - cursor.Replace(ae.Expr) + + if useAlias { + // Use the column alias as a simple ColName so nested Unions can resolve it + cursor.Replace(sqlparser.NewColName(ae.ColumnName())) + } else { + // Use the raw underlying expression for direct pushdown + cursor.Replace(ae.Expr) + } }, nil).(sqlparser.Expr) exprPerSource[i] = predicate @@ -145,6 +171,24 @@ func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int) return needsFilter, exprPerSource } +// sourceNeedsAliasRewrite checks if a source is a Horizon that wraps another Union. +// In such cases, we need to use the column alias instead of the raw expression +// when rewriting predicates, so the inner Union can properly resolve column references. +func (u *Union) sourceNeedsAliasRewrite(sourceIdx int) bool { + src := u.Sources[sourceIdx] + for { + switch op := src.(type) { + case *Horizon: + _, isUnion := op.Source.(*Union) + return isUnion + case *Route: + src = op.Source + default: + return false + } + } +} + func (u *Union) GetSelectFor(source int) *sqlparser.Select { src := u.Sources[source] for { diff --git a/go/vt/vtgate/planbuilder/testdata/union_cases.json b/go/vt/vtgate/planbuilder/testdata/union_cases.json index 2a6c54c94ef..756bfa7fb98 100644 --- a/go/vt/vtgate/planbuilder/testdata/union_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/union_cases.json @@ -2067,6 +2067,76 @@ "user.user" ] } + }, + { + "comment": "join with derived table containing UNION with aliased columns across different keyspaces", + "query": "select u.id from user as u join (select id as pid from user where id = 1 union select id as pid from unsharded where id = 1) as i on u.id = i.pid", + "plan": { + "Type": "Join", + "QueryType": "SELECT", + "Original": "select u.id from user as u join (select id as pid from user where id = 1 union select id as pid from unsharded where id = 1) as i on u.id = i.pid", + "Instructions": { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0", + "JoinVars": { + "u_id": 0 + }, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select u.id from `user` as u where 1 != 1", + "Query": "select u.id from `user` as u" + }, + { + "OperatorType": "Distinct", + "Collations": [ + "(0:1)" + ], + "Inputs": [ + { + "OperatorType": "Concatenate", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "EqualUnique", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select dt.c0 as pid, weight_string(dt.c0) from (select id as pid from `user` where 1 != 1) as dt(c0) where 1 != 1", + "Query": "select dt.c0 as pid, weight_string(dt.c0) from (select distinct id as pid from `user` where id = 1 and id = :u_id) as dt(c0)", + "Values": [ + ":u_id" + ], + "Vindex": "user_index" + }, + { + "OperatorType": "Route", + "Variant": "Unsharded", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "FieldQuery": "select dt.c0 as pid, weight_string(dt.c0) from (select id as pid from unsharded where 1 != 1) as dt(c0) where 1 != 1", + "Query": "select dt.c0 as pid, weight_string(dt.c0) from (select distinct id as pid from unsharded where id = 1 and id = :u_id) as dt(c0)" + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "main.unsharded", + "user.user" + ] + } } ] From b03c3f8500a6adcfb355e3004edccb0dc514714b Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Thu, 8 Jan 2026 14:56:23 +0000 Subject: [PATCH 2/4] Simplify changes. Signed-off-by: Arthur Schreiber --- go/vt/vtgate/planbuilder/operators/union.go | 45 +++------------------ 1 file changed, 5 insertions(+), 40 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/union.go b/go/vt/vtgate/planbuilder/operators/union.go index 506ff195a87..e3979dff330 100644 --- a/go/vt/vtgate/planbuilder/operators/union.go +++ b/go/vt/vtgate/planbuilder/operators/union.go @@ -120,25 +120,13 @@ func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int) needsFilter := false exprPerSource := make([]sqlparser.Expr, len(u.Sources)) - // If the expression is a JoinPredicate, we need to extract the underlying expression - // before iterating over sources. This is because JoinPredicate.Clone() mutates shared - // tracker state during CopyOnRewrite, which would cause subsequent iterations to see - // the already-rewritten expression instead of the original. - baseExpr := expr + // Unwrap JoinPredicate if needed if jp, ok := expr.(*predicates.JoinPredicate); ok { - baseExpr = jp.Current() + expr = jp.Current() } for i := range u.Sources { - // Clone the base expression for each source to ensure independent rewrites - exprToRewrite := sqlparser.Clone(baseExpr) - - // Check if this source can receive the raw underlying expression or needs the column alias. - // When a source is a Horizon wrapping another Union, we must use the column alias - // so that the inner Union can properly resolve the column reference. - useAlias := u.sourceNeedsAliasRewrite(i) - - predicate := sqlparser.CopyOnRewrite(exprToRewrite, nil, func(cursor *sqlparser.CopyOnWriteCursor) { + predicate := sqlparser.CopyOnRewrite(expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) { col, ok := cursor.Node().(*sqlparser.ColName) if !ok { return @@ -157,36 +145,13 @@ func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int) panic(vterrors.VT09015()) } - if useAlias { - // Use the column alias as a simple ColName so nested Unions can resolve it - cursor.Replace(sqlparser.NewColName(ae.ColumnName())) - } else { - // Use the raw underlying expression for direct pushdown - cursor.Replace(ae.Expr) - } + cursor.Replace(ae.Expr) }, nil).(sqlparser.Expr) exprPerSource[i] = predicate } - return needsFilter, exprPerSource -} -// sourceNeedsAliasRewrite checks if a source is a Horizon that wraps another Union. -// In such cases, we need to use the column alias instead of the raw expression -// when rewriting predicates, so the inner Union can properly resolve column references. -func (u *Union) sourceNeedsAliasRewrite(sourceIdx int) bool { - src := u.Sources[sourceIdx] - for { - switch op := src.(type) { - case *Horizon: - _, isUnion := op.Source.(*Union) - return isUnion - case *Route: - src = op.Source - default: - return false - } - } + return needsFilter, exprPerSource } func (u *Union) GetSelectFor(source int) *sqlparser.Select { From feb4c8e6ca7b2d30e5c792db9390b8c1fd0dbc27 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 13 Jan 2026 10:26:41 +0000 Subject: [PATCH 3/4] Move unwrapping the join predicate. Signed-off-by: Arthur Schreiber --- go/vt/vtgate/planbuilder/operators/union.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/union.go b/go/vt/vtgate/planbuilder/operators/union.go index e3979dff330..0e1933cf8bc 100644 --- a/go/vt/vtgate/planbuilder/operators/union.go +++ b/go/vt/vtgate/planbuilder/operators/union.go @@ -104,6 +104,10 @@ func (u *Union) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Ex offsets[ae.ColumnName()] = i } + if jp, ok := expr.(*predicates.JoinPredicate); ok { + expr = jp.Current() + } + needsFilter, exprPerSource := u.predicatePerSource(expr, offsets) if needsFilter { return newFilter(u, expr) @@ -120,11 +124,6 @@ func (u *Union) predicatePerSource(expr sqlparser.Expr, offsets map[string]int) needsFilter := false exprPerSource := make([]sqlparser.Expr, len(u.Sources)) - // Unwrap JoinPredicate if needed - if jp, ok := expr.(*predicates.JoinPredicate); ok { - expr = jp.Current() - } - for i := range u.Sources { predicate := sqlparser.CopyOnRewrite(expr, nil, func(cursor *sqlparser.CopyOnWriteCursor) { col, ok := cursor.Node().(*sqlparser.ColName) From 576e9cbdcf84430d0a118feda8a78b690c6d3927 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Tue, 13 Jan 2026 14:35:38 +0000 Subject: [PATCH 4/4] Mark the `JoinPredicate` as handled. Signed-off-by: Arthur Schreiber --- go/vt/vtgate/planbuilder/operators/union.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vtgate/planbuilder/operators/union.go b/go/vt/vtgate/planbuilder/operators/union.go index 0e1933cf8bc..ebdc80d6179 100644 --- a/go/vt/vtgate/planbuilder/operators/union.go +++ b/go/vt/vtgate/planbuilder/operators/union.go @@ -106,6 +106,7 @@ func (u *Union) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.Ex if jp, ok := expr.(*predicates.JoinPredicate); ok { expr = jp.Current() + ctx.PredTracker.Skip(jp.ID) } needsFilter, exprPerSource := u.predicatePerSource(expr, offsets)