Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,11 +289,11 @@ func transformAggregator(ctx *plancontext.PlanningContext, op *operators.Aggrega
oa.aggregates = append(oa.aggregates, aggrParam)
}
for _, groupBy := range op.Grouping {
typ, _ := ctx.SemTable.TypeForExpr(groupBy.SimplifiedExpr)
typ, _ := ctx.SemTable.TypeForExpr(groupBy.Inner)
oa.groupByKeys = append(oa.groupByKeys, &engine.GroupByParams{
KeyCol: groupBy.ColOffset,
WeightStringCol: groupBy.WSOffset,
Expr: groupBy.SimplifiedExpr,
Expr: groupBy.Inner,
Type: typ,
CollationEnv: ctx.VSchema.CollationEnv(),
})
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/SQL_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ func buildAggregation(op *Aggregator, qb *queryBuilder) {

for _, by := range op.Grouping {
qb.addGroupBy(by.Inner)
simplified := by.SimplifiedExpr
simplified := by.Inner
if by.WSOffset != -1 {
qb.addGroupBy(weightStringFor(simplified))
}
Expand Down
55 changes: 25 additions & 30 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func pushAggregations(ctx *plancontext.PlanningContext, aggregator *Aggregator,
// doing the aggregating on the vtgate level instead
// Adding to group by can be done only once even though there are multiple distinct aggregation with same expression.
if !distinctAggrGroupByAdded {
groupBy := NewGroupBy(distinctExprs[0], distinctExprs[0])
groupBy := NewGroupBy(distinctExprs[0])
groupBy.ColOffset = aggr.ColOffset
aggrBelowRoute.Grouping = append(aggrBelowRoute.Grouping, groupBy)
distinctAggrGroupByAdded = true
Expand Down Expand Up @@ -260,7 +260,7 @@ func pushAggregationThroughFilter(
withNextColumn:
for _, col := range columnsNeeded {
for _, gb := range pushedAggr.Grouping {
if ctx.SemTable.EqualsExpr(col, gb.SimplifiedExpr) {
if ctx.SemTable.EqualsExpr(col, gb.Inner) {
continue withNextColumn
}
}
Expand Down Expand Up @@ -300,7 +300,7 @@ func collectColNamesNeeded(ctx *plancontext.PlanningContext, f *Filter) (columns

func overlappingUniqueVindex(ctx *plancontext.PlanningContext, groupByExprs []GroupBy) bool {
for _, groupByExpr := range groupByExprs {
if exprHasUniqueVindex(ctx, groupByExpr.SimplifiedExpr) {
if exprHasUniqueVindex(ctx, groupByExpr.Inner) {
return true
}
}
Expand Down Expand Up @@ -387,7 +387,7 @@ func pushAggregationThroughApplyJoin(ctx *plancontext.PlanningContext, rootAggr

// We need to add any columns coming from the lhs of the join to the group by on that side
// If we don't, the LHS will not be able to return the column, and it can't be used to send down to the RHS
addColumnsFromLHSInJoinPredicates(ctx, rootAggr, join, lhs)
addColumnsFromLHSInJoinPredicates(ctx, join, lhs)

join.LHS, join.RHS = lhs.pushed, rhs.pushed

Expand Down Expand Up @@ -419,29 +419,28 @@ func pushAggregationThroughHashJoin(ctx *plancontext.PlanningContext, rootAggr *

// The two sides of the hash comparisons are added as grouping expressions
for _, cmp := range join.JoinComparisons {
lhs.addGrouping(ctx, NewGroupBy(cmp.LHS, cmp.LHS))
lhs.addGrouping(ctx, NewGroupBy(cmp.LHS))
columns.addLeft(cmp.LHS)

rhs.addGrouping(ctx, NewGroupBy(cmp.RHS, cmp.RHS))
rhs.addGrouping(ctx, NewGroupBy(cmp.RHS))
columns.addRight(cmp.RHS)
}

// The grouping columns need to be pushed down as grouping columns on the respective sides
for _, groupBy := range rootAggr.Grouping {
expr := rootAggr.QP.GetSimplifiedExpr(ctx, groupBy.Inner)
deps := ctx.SemTable.RecursiveDeps(expr)
deps := ctx.SemTable.RecursiveDeps(groupBy.Inner)
switch {
case deps.IsSolvedBy(lhs.tableID):
lhs.addGrouping(ctx, groupBy)
columns.addLeft(expr)
columns.addLeft(groupBy.Inner)
case deps.IsSolvedBy(rhs.tableID):
rhs.addGrouping(ctx, groupBy)
columns.addRight(expr)
columns.addRight(groupBy.Inner)
case deps.IsSolvedBy(lhs.tableID.Merge(rhs.tableID)):
// TODO: Support this as well
return nil, nil
default:
panic(vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.SimplifiedExpr)))
panic(vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.Inner)))
}
}

Expand Down Expand Up @@ -473,29 +472,26 @@ func createJoinPusher(rootAggr *Aggregator, operator Operator) *joinPusher {
}
}

func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, rootAggr *Aggregator, join *ApplyJoin, lhs *joinPusher) {
func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, join *ApplyJoin, lhs *joinPusher) {
for _, pred := range join.JoinPredicates.columns {
for _, bve := range pred.LHSExprs {
expr := bve.Expr
wexpr := rootAggr.QP.GetSimplifiedExpr(ctx, expr)
idx, found := canReuseColumn(ctx, lhs.pushed.Columns, expr, extractExpr)
idx, found := canReuseColumn(ctx, lhs.pushed.Columns, bve.Expr, extractExpr)
if !found {
idx = len(lhs.pushed.Columns)
lhs.pushed.Columns = append(lhs.pushed.Columns, aeWrap(expr))
lhs.pushed.Columns = append(lhs.pushed.Columns, aeWrap(bve.Expr))
}
_, found = canReuseColumn(ctx, lhs.pushed.Grouping, wexpr, func(by GroupBy) sqlparser.Expr {
return by.SimplifiedExpr
_, found = canReuseColumn(ctx, lhs.pushed.Grouping, bve.Expr, func(by GroupBy) sqlparser.Expr {
return by.Inner
})

if found {
continue
}

lhs.pushed.Grouping = append(lhs.pushed.Grouping, GroupBy{
Inner: expr,
SimplifiedExpr: wexpr,
ColOffset: idx,
WSOffset: -1,
Inner: bve.Expr,
ColOffset: idx,
WSOffset: -1,
})
}
}
Expand All @@ -508,24 +504,23 @@ func splitGroupingToLeftAndRight(
columns joinColumns,
) {
for _, groupBy := range rootAggr.Grouping {
expr := rootAggr.QP.GetSimplifiedExpr(ctx, groupBy.Inner)
deps := ctx.SemTable.RecursiveDeps(expr)
deps := ctx.SemTable.RecursiveDeps(groupBy.Inner)
switch {
case deps.IsSolvedBy(lhs.tableID):
lhs.addGrouping(ctx, groupBy)
columns.addLeft(expr)
columns.addLeft(groupBy.Inner)
case deps.IsSolvedBy(rhs.tableID):
rhs.addGrouping(ctx, groupBy)
columns.addRight(expr)
columns.addRight(groupBy.Inner)
case deps.IsSolvedBy(lhs.tableID.Merge(rhs.tableID)):
jc := breakExpressionInLHSandRHSForApplyJoin(ctx, groupBy.SimplifiedExpr, lhs.tableID)
jc := breakExpressionInLHSandRHSForApplyJoin(ctx, groupBy.Inner, lhs.tableID)
for _, lhsExpr := range jc.LHSExprs {
e := lhsExpr.Expr
lhs.addGrouping(ctx, NewGroupBy(e, e))
lhs.addGrouping(ctx, NewGroupBy(e))
}
rhs.addGrouping(ctx, NewGroupBy(jc.RHSExpr, jc.RHSExpr))
rhs.addGrouping(ctx, NewGroupBy(jc.RHSExpr))
default:
panic(vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.SimplifiedExpr)))
panic(vterrors.VT13001(fmt.Sprintf("grouping with bad dependencies %s", groupBy.Inner)))
}
}
}
Expand Down
16 changes: 8 additions & 8 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ func (a *Aggregator) AddPredicate(_ *plancontext.PlanningContext, expr sqlparser
}
}

func (a *Aggregator) addColumnWithoutPushing(ctx *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int {
func (a *Aggregator) addColumnWithoutPushing(_ *plancontext.PlanningContext, expr *sqlparser.AliasedExpr, addToGroupBy bool) int {
offset := len(a.Columns)
a.Columns = append(a.Columns, expr)

if addToGroupBy {
groupBy := NewGroupBy(expr.Expr, expr.Expr)
groupBy := NewGroupBy(expr.Expr)
groupBy.ColOffset = offset
a.Grouping = append(a.Grouping, groupBy)
} else {
Expand Down Expand Up @@ -154,7 +154,7 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gro
// This process also sets the weight string column offset, eliminating the need for a later addition in the aggregator operator's planOffset.
if wsExpr, isWS := rewritten.(*sqlparser.WeightStringFuncExpr); isWS {
idx := slices.IndexFunc(a.Grouping, func(by GroupBy) bool {
return ctx.SemTable.EqualsExprWithDeps(wsExpr.Expr, by.SimplifiedExpr)
return ctx.SemTable.EqualsExprWithDeps(wsExpr.Expr, by.Inner)
})
if idx >= 0 {
a.Grouping[idx].WSOffset = len(a.Columns)
Expand Down Expand Up @@ -241,7 +241,7 @@ func (a *Aggregator) ShortDescription() string {

var grouping []string
for _, gb := range a.Grouping {
grouping = append(grouping, sqlparser.String(gb.SimplifiedExpr))
grouping = append(grouping, sqlparser.String(gb.Inner))
}

return fmt.Sprintf("%s%s group by %s", org, strings.Join(columns, ", "), strings.Join(grouping, ","))
Expand All @@ -268,11 +268,11 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) Operator {
offset := a.internalAddColumn(ctx, aeWrap(gb.Inner), false)
a.Grouping[idx].ColOffset = offset
}
if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.SimplifiedExpr) {
if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.Inner) {
continue
}

offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(gb.SimplifiedExpr)), true)
offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(gb.Inner)), true)
a.Grouping[idx].WSOffset = offset
}

Expand Down Expand Up @@ -371,11 +371,11 @@ func (a *Aggregator) pushRemainingGroupingColumnsAndWeightStrings(ctx *planconte
a.Grouping[idx].ColOffset = offset
}

if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.SimplifiedExpr) {
if gb.WSOffset != -1 || !ctx.SemTable.NeedsWeightString(gb.Inner) {
continue
}

offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(gb.SimplifiedExpr)), false)
offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(gb.Inner)), false)
a.Grouping[idx].WSOffset = offset
}
for idx, aggr := range a.Aggregations {
Expand Down
6 changes: 1 addition & 5 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,7 @@ type (
func (d *Distinct) planOffsets(ctx *plancontext.PlanningContext) Operator {
columns := d.GetColumns(ctx)
for idx, col := range columns {
e, err := d.QP.TryGetSimplifiedExpr(ctx, col.Expr)
if err != nil {
// ambiguous columns are not a problem for DISTINCT
e = col.Expr
}
e := col.Expr
var wsCol *int
typ, _ := ctx.SemTable.TypeForExpr(e)

Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ outer:
}
addedToCol := false
for idx, groupBy := range a.Grouping {
if ctx.SemTable.EqualsExprWithDeps(groupBy.SimplifiedExpr, ae.Expr) {
if ctx.SemTable.EqualsExprWithDeps(groupBy.Inner, ae.Expr) {
if !addedToCol {
a.Columns = append(a.Columns, ae)
addedToCol = true
Expand Down Expand Up @@ -214,7 +214,7 @@ func createProjectionForComplexAggregation(a *Aggregator, qp *QueryProjection) O
}
for i, by := range a.Grouping {
a.Grouping[i].ColOffset = len(a.Columns)
a.Columns = append(a.Columns, aeWrap(by.SimplifiedExpr))
a.Columns = append(a.Columns, aeWrap(by.Inner))
}
for i, aggregation := range a.Aggregations {
a.Aggregations[i].ColOffset = len(a.Columns)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/phases.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func addOrderingFor(aggrOp *Aggregator) {

func needsOrdering(ctx *plancontext.PlanningContext, in *Aggregator) bool {
requiredOrder := slice.Map(in.Grouping, func(from GroupBy) sqlparser.Expr {
return from.SimplifiedExpr
return from.Inner
})
if in.DistinctExpr != nil {
requiredOrder = append(requiredOrder, in.DistinctExpr)
Expand Down Expand Up @@ -209,7 +209,7 @@ func addLiteralGroupingToRHS(in *ApplyJoin) (Operator, *ApplyResult) {
}
if len(aggr.Grouping) == 0 {
gb := sqlparser.NewIntLiteral(".0")
aggr.Grouping = append(aggr.Grouping, NewGroupBy(gb, gb))
aggr.Grouping = append(aggr.Grouping, NewGroupBy(gb))
}
return nil
})
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func overlaps(ctx *plancontext.PlanningContext, order []OrderBy, grouping []Grou
ordering:
for _, orderBy := range order {
for _, groupBy := range grouping {
if ctx.SemTable.EqualsExprWithDeps(orderBy.SimplifiedExpr, groupBy.SimplifiedExpr) {
if ctx.SemTable.EqualsExprWithDeps(orderBy.SimplifiedExpr, groupBy.Inner) {
continue ordering
}
}
Expand Down Expand Up @@ -674,7 +674,7 @@ func pushOrderingUnderAggr(ctx *plancontext.PlanningContext, order *Ordering, ag
used := make([]bool, len(aggregator.Grouping))
for _, orderExpr := range order.Order {
for grpIdx, by := range aggregator.Grouping {
if !used[grpIdx] && ctx.SemTable.EqualsExprWithDeps(by.SimplifiedExpr, orderExpr.SimplifiedExpr) {
if !used[grpIdx] && ctx.SemTable.EqualsExprWithDeps(by.Inner, orderExpr.SimplifiedExpr) {
newGrouping = append(newGrouping, by)
used[grpIdx] = true
}
Expand Down
Loading