Skip to content
8 changes: 8 additions & 0 deletions go/slices2/slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,11 @@ func Any[T any](s []T, fn func(T) bool) bool {
}
return false
}

func Map[From, To any](in []From, f func(From) To) []To {
result := make([]To, len(in))
for i, col := range in {
result[i] = f(col)
}
return result
}
58 changes: 42 additions & 16 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,32 +59,58 @@ func transformToLogicalPlan(ctx *plancontext.PlanningContext, op ops.Operator, i
return transformCorrelatedSubQueryPlan(ctx, op)
case *operators.Derived:
return transformDerivedPlan(ctx, op)
case *operators.SimpleProjection:
return transformSimpleProjection(ctx, op)
case *operators.Filter:
plan, err := transformToLogicalPlan(ctx, op.Source, false)
if err != nil {
return nil, err
}
ast := ctx.SemTable.AndExpressions(op.Predicates...)
predicate, err := evalengine.Translate(ast, &evalengine.Config{
return transformFilter(ctx, op)
case *operators.Horizon:
return transformHorizon(ctx, op, isRoot)
}

return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op))
}

func transformFilter(ctx *plancontext.PlanningContext, op *operators.Filter) (logicalPlan, error) {
plan, err := transformToLogicalPlan(ctx, op.Source, false)
if err != nil {
return nil, err
}

predicate := op.FinalPredicate
ast := ctx.SemTable.AndExpressions(op.Predicates...)

// this might already have been done on the operators
if predicate == nil {
predicate, err = evalengine.Translate(ast, &evalengine.Config{
ResolveColumn: resolveFromPlan(ctx, plan, true),
Collation: ctx.SemTable.Collation,
})
if err != nil {
return nil, err
}
}

return &filter{
logicalPlanCommon: newBuilderCommon(plan),
efilter: &engine.Filter{
Predicate: predicate,
ASTPredicate: ast,
},
}, nil
case *operators.Horizon:
return transformHorizon(ctx, op, isRoot)
return &filter{
logicalPlanCommon: newBuilderCommon(plan),
efilter: &engine.Filter{
Predicate: predicate,
ASTPredicate: ast,
},
}, nil
}

func transformSimpleProjection(ctx *plancontext.PlanningContext, op *operators.SimpleProjection) (logicalPlan, error) {
src, err := transformToLogicalPlan(ctx, op.Source, false)
if err != nil {
return nil, err
}

return nil, vterrors.VT13001(fmt.Sprintf("unknown type encountered: %T (transformToLogicalPlan)", op))
return &simpleProjection{
logicalPlanCommon: newBuilderCommon(src),
eSimpleProj: &engine.SimpleProjection{
Cols: op.Columns,
},
}, nil
}

func transformHorizon(ctx *plancontext.PlanningContext, op *operators.Horizon, isRoot bool) (logicalPlan, error) {
Expand Down
66 changes: 44 additions & 22 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (a *ApplyJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlp
return err
}
for i, col := range cols {
offset, err := a.LHS.AddColumn(ctx, col)
offset, err := a.pushColLeft(ctx, aeWrap(col))
if err != nil {
return err
}
Expand All @@ -140,54 +140,76 @@ func (a *ApplyJoin) AddJoinPredicate(ctx *plancontext.PlanningContext, expr sqlp
return nil
}

func (a *ApplyJoin) AddColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (int, error) {
// first check if we already are passing through this expression
for i, existing := range a.ColumnsAST {
if ctx.SemTable.EqualsExpr(existing, expr) {
return i, nil
}
func (a *ApplyJoin) pushColLeft(ctx *plancontext.PlanningContext, e *sqlparser.AliasedExpr) (int, error) {
newLHS, offset, err := a.LHS.AddColumn(ctx, e, true)
if err != nil {
return 0, err
}
a.LHS = newLHS
return offset, nil
}

func (a *ApplyJoin) pushColRight(ctx *plancontext.PlanningContext, e *sqlparser.AliasedExpr) (int, error) {
newRHS, offset, err := a.RHS.AddColumn(ctx, e, true)
if err != nil {
return 0, err
}
a.RHS = newRHS
return offset, nil
}

func (a *ApplyJoin) GetColumns() ([]sqlparser.Expr, error) {
return a.ColumnsAST, nil
}

func (a *ApplyJoin) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, reuseCol bool) (ops.Operator, int, error) {
if offset, found := canReuseColumn(ctx, reuseCol, a.ColumnsAST, expr.Expr); found {
return a, offset, nil
}

lhs := TableID(a.LHS)
rhs := TableID(a.RHS)
both := lhs.Merge(rhs)
deps := ctx.SemTable.RecursiveDeps(expr)
deps := ctx.SemTable.RecursiveDeps(expr.Expr)

// if we get here, it's a new expression we are dealing with.
// We need to decide if we can push it all on either side,
// or if we have to break the expression into left and right parts
switch {
case deps.IsSolvedBy(lhs):
offset, err := a.LHS.AddColumn(ctx, expr)
offset, err := a.pushColLeft(ctx, expr)
if err != nil {
return 0, err
return nil, 0, err
}
a.Columns = append(a.Columns, -offset-1)
case deps.IsSolvedBy(rhs):
offset, err := a.pushColRight(ctx, expr)
if err != nil {
return nil, 0, err
}
a.Columns = append(a.Columns, offset+1)
case deps.IsSolvedBy(both):
bvNames, lhsExprs, rhsExpr, err := BreakExpressionInLHSandRHS(ctx, expr, lhs)
bvNames, lhsExprs, rhsExpr, err := BreakExpressionInLHSandRHS(ctx, expr.Expr, lhs)
if err != nil {
return 0, err
return nil, 0, err
}
for i, lhsExpr := range lhsExprs {
offset, err := a.LHS.AddColumn(ctx, lhsExpr)
offset, err := a.pushColLeft(ctx, aeWrap(lhsExpr))
if err != nil {
return 0, err
return nil, 0, err
}
a.Vars[bvNames[i]] = offset
}
expr = rhsExpr
fallthrough // now we just pass the rest to the RHS of the join
case deps.IsSolvedBy(rhs):
offset, err := a.RHS.AddColumn(ctx, expr)
offset, err := a.pushColRight(ctx, aeWrap(rhsExpr))
if err != nil {
return 0, err
return nil, 0, err
}
a.Columns = append(a.Columns, offset+1)
default:
return 0, vterrors.VT13002(sqlparser.String(expr))
return nil, 0, vterrors.VT13002(sqlparser.String(expr))
}

// the expression wasn't already there - let's add it
a.ColumnsAST = append(a.ColumnsAST, expr)
return len(a.Columns) - 1, nil
a.ColumnsAST = append(a.ColumnsAST, expr.Expr)
return a, len(a.Columns) - 1, nil
}
46 changes: 39 additions & 7 deletions go/vt/vtgate/planbuilder/operators/derived.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package operators
import (
"golang.org/x/exp/slices"

"vitess.io/vitess/go/slices2"

"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"

"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -132,28 +134,58 @@ func (d *Derived) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.
return d, nil
}

func (d *Derived) AddColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (int, error) {
col, ok := expr.(*sqlparser.ColName)
func (d *Derived) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, reuseCol bool) (ops.Operator, int, error) {
col, ok := expr.Expr.(*sqlparser.ColName)
if !ok {
return 0, vterrors.VT13001("cannot push non-colname expression to a derived table")
return nil, 0, vterrors.VT13001("cannot push non-colname expression to a derived table")
}

if offset, found := canReuseColumn(ctx, reuseCol, d.Columns, col); found {
return d, offset, nil
}

i, err := d.findOutputColumn(col)
if err != nil {
return 0, err
return nil, 0, err
}
var pos int
d.ColumnsOffset, pos = addToIntSlice(d.ColumnsOffset, i)

d.Columns = append(d.Columns, col)
// add it to the source if we were not already passing it through
if i <= -1 {
_, err := d.Source.AddColumn(ctx, sqlparser.NewColName(col.Name.String()))
newSrc, _, err := d.Source.AddColumn(ctx, aeWrap(sqlparser.NewColName(col.Name.String())), true)
if err != nil {
return 0, err
return nil, 0, err
}
d.Source = newSrc
}
return d, pos, nil
}

// canReuseColumn is generic, so it can be used with slices of different types.
// We don't care about the actual type, as long as we know it's a sqlparser.Expr
func canReuseColumn[Expr sqlparser.Expr](
ctx *plancontext.PlanningContext,
reuseCol bool,
columns []Expr,
col sqlparser.Expr,
) (offset int, found bool) {
if !reuseCol {
return
}

for offset, column := range columns {
if ctx.SemTable.EqualsExpr(col, column) {
return offset, true
}
}
return pos, nil

return
}

func (d *Derived) GetColumns() ([]sqlparser.Expr, error) {
return slices2.Map(d.Columns, colNameToExpr), nil
}

func addToIntSlice(columnOffset []int, valToAdd int) ([]int, int) {
Expand Down
18 changes: 16 additions & 2 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package operators

import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
Expand All @@ -27,6 +28,10 @@ import (
type Filter struct {
Source ops.Operator
Predicates []sqlparser.Expr

// FinalPredicate is the evalengine expression that will finally be used.
// It contains the ANDed predicates in Predicates, with ColName:s replaced by Offset:s
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does Colname:s mean? Also Offest:s?

FinalPredicate evalengine.Expr
}

var _ ops.PhysicalOperator = (*Filter)(nil)
Expand Down Expand Up @@ -77,8 +82,17 @@ func (f *Filter) AddPredicate(ctx *plancontext.PlanningContext, expr sqlparser.E
return f, nil
}

func (f *Filter) AddColumn(ctx *plancontext.PlanningContext, expr sqlparser.Expr) (int, error) {
return f.Source.AddColumn(ctx, expr)
func (f *Filter) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, reuseCol bool) (ops.Operator, int, error) {
newSrc, offset, err := f.Source.AddColumn(ctx, expr, reuseCol)
if err != nil {
return nil, 0, err
}
f.Source = newSrc
return f, offset, nil
}

func (f *Filter) GetColumns() ([]sqlparser.Expr, error) {
return f.Source.GetColumns()
}

func (f *Filter) Compact(*plancontext.PlanningContext) (ops.Operator, rewrite.TreeIdentity, error) {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func Compact(ctx *plancontext.PlanningContext, op ops.Operator) (ops.Operator, e
Compact(ctx *plancontext.PlanningContext) (ops.Operator, rewrite.TreeIdentity, error)
}

newOp, err := rewrite.BottomUp(op, semantics.EmptyTableSet(), TableID, func(_ semantics.TableSet, op ops.Operator) (ops.Operator, rewrite.TreeIdentity, error) {
newOp, err := rewrite.BottomUpAll(op, TableID, func(op ops.Operator, _ semantics.TableSet) (ops.Operator, rewrite.TreeIdentity, error) {
newOp, ok := op.(compactable)
if !ok {
return op, rewrite.SameTree, nil
Expand Down
7 changes: 6 additions & 1 deletion go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,13 @@ import (
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
)

// Horizon is an operator we use until we decide how to handle the source to the horizon.
// Horizon is an operator that allows us to postpone planning things like SELECT/GROUP BY/ORDER BY/LIMIT until later.
// It contains information about the planning we have to do after deciding how we will send the query to the tablets.
// If we are able to push down the Horizon under a route, we don't have to plan these things separately and can
// just copy over the AST constructs to the query being sent to a tablet.
// If we are not able to push it down, this operator needs to be split up into smaller
// Project/Aggregate/Sort/Limit operations, some which can be pushed down,
// and some that have to be evaluated at the vtgate level.
type Horizon struct {
Source ops.Operator
Select sqlparser.SelectStatement
Expand Down
Loading