Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/test/endtoend/vtgate/queries/aggregation/fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type (
)

func TestFuzzAggregations(t *testing.T) {
t.Skip("dont run on CI for now")
// This test randomizes values and queries, and checks that mysql returns the same values that Vitess does
mcmp, closer := start(t)
defer closer()
Expand Down
37 changes: 24 additions & 13 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,13 @@ func (a *Aggregator) addColumnWithoutPushing(expr *sqlparser.AliasedExpr, addToG
groupBy.ColOffset = offset
a.Grouping = append(a.Grouping, groupBy)
} else {
aggr := NewAggr(opcode.AggregateRandom, nil, expr, expr.As.String())
var aggr Aggr
switch e := expr.Expr.(type) {
case sqlparser.AggrFunc:
aggr = createAggrFromAggrFunc(e, expr)
default:
aggr = NewAggr(opcode.AggregateRandom, nil, expr, expr.As.String())
}
aggr.ColOffset = offset
a.Aggregations = append(a.Aggregations, aggr)
}
Expand Down Expand Up @@ -159,35 +165,40 @@ func (a *Aggregator) AddColumn(ctx *plancontext.PlanningContext, expr *sqlparser
return a, offset, nil
}

func (a *Aggregator) GetColumns() (columns []*sqlparser.AliasedExpr, err error) {
return a.Columns, nil
}
func (a *Aggregator) GetColumns() ([]*sqlparser.AliasedExpr, error) {
// we update the incoming columns, so we know about any new columns that have been added
columns, err := a.Source.GetColumns()
if err != nil {
return nil, err
}

func (a *Aggregator) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Aggregator",
// if this operator is producing more columns than expected, we want to know about it
if len(columns) > len(a.Columns) {
a.Columns = append(a.Columns, columns[len(a.Columns):]...)
}

return a.Columns, nil
}

func (a *Aggregator) ShortDescription() string {
columnns := slices2.Map(a.Columns, func(from *sqlparser.AliasedExpr) string {
return sqlparser.String(from)
})

org := ""
if a.Original {
org = "ORG "
}

if len(a.Grouping) == 0 {
return strings.Join(columnns, ", ")
return fmt.Sprintf("%s%s", org, strings.Join(columnns, ", "))
}

var grouping []string
for _, gb := range a.Grouping {
grouping = append(grouping, sqlparser.String(gb.SimplifiedExpr))
}

org := ""
if a.Original {
org = "ORG "
}

return fmt.Sprintf("%s%s group by %s", org, strings.Join(columnns, ", "), strings.Join(grouping, ","))
}

Expand Down
15 changes: 0 additions & 15 deletions go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,21 +287,6 @@ func (a *ApplyJoin) addOffset(offset int) {
a.Columns = append(a.Columns, offset)
}

func (a *ApplyJoin) Description() ops.OpDescription {
other := map[string]any{}
if len(a.Columns) > 0 {
other["OutputColumns"] = a.Columns
}
if a.Predicate != nil {
other["Predicate"] = sqlparser.String(a.Predicate)
}
return ops.OpDescription{
OperatorType: "Join",
Variant: "Apply",
Other: other,
}
}

func (a *ApplyJoin) ShortDescription() string {
pred := sqlparser.String(a.Predicate)
columns := slices2.Map(a.ColumnsAST, func(from JoinColumn) string {
Expand Down
14 changes: 0 additions & 14 deletions go/vt/vtgate/planbuilder/operators/correlated_subquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ func (s *SubQueryOp) SetInputs(ops []ops.Operator) {
s.Outer, s.Inner = ops[0], ops[1]
}

func (s *SubQueryOp) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "SubQuery",
Variant: "Apply",
}
}

func (s *SubQueryOp) ShortDescription() string {
return ""
}
Expand Down Expand Up @@ -114,13 +107,6 @@ func (c *CorrelatedSubQueryOp) SetInputs(ops []ops.Operator) {
c.Outer, c.Inner = ops[0], ops[1]
}

func (c *CorrelatedSubQueryOp) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "SubQuery",
Variant: "Correlated",
}
}

func (c *CorrelatedSubQueryOp) ShortDescription() string {
return ""
}
6 changes: 0 additions & 6 deletions go/vt/vtgate/planbuilder/operators/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ func (d *Delete) GetOrdering() ([]ops.OrderBy, error) {
return nil, nil
}

func (d *Delete) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Delete",
}
}

func (d *Delete) ShortDescription() string {
return fmt.Sprintf("%s.%s %s", d.VTable.Keyspace.Name, d.VTable.Name.String(), sqlparser.String(d.AST.Where))
}
6 changes: 0 additions & 6 deletions go/vt/vtgate/planbuilder/operators/derived.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,12 +257,6 @@ func (d *Derived) setQP(qp *QueryProjection) {
d.QP = qp
}

func (d *Derived) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Derived",
}
}

func (d *Derived) ShortDescription() string {
return d.Alias
}
6 changes: 0 additions & 6 deletions go/vt/vtgate/planbuilder/operators/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,6 @@ func (d *Distinct) GetColumns() ([]*sqlparser.AliasedExpr, error) {
return d.Source.GetColumns()
}

func (d *Distinct) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Distinct",
}
}

func (d *Distinct) ShortDescription() string {
return ""
}
Expand Down
36 changes: 15 additions & 21 deletions go/vt/vtgate/planbuilder/operators/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ limitations under the License.
package operators

import (
"strings"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/ops"
"vitess.io/vitess/go/vt/vtgate/planbuilder/operators/rewrite"
Expand Down Expand Up @@ -114,38 +118,28 @@ func (f *Filter) Compact(*plancontext.PlanningContext) (ops.Operator, *rewrite.A
}

func (f *Filter) planOffsets(ctx *plancontext.PlanningContext) error {
resolveColumn := func(col *sqlparser.ColName) (int, error) {
newSrc, offset, err := f.Source.AddColumn(ctx, aeWrap(col), true, false)
if err != nil {
return 0, err
}
f.Source = newSrc
return offset, nil
}
cfg := &evalengine.Config{
ResolveType: ctx.SemTable.TypeForExpr,
Collation: ctx.SemTable.Collation,
ResolveColumn: resolveColumn,
ResolveType: ctx.SemTable.TypeForExpr,
Collation: ctx.SemTable.Collation,
}

eexpr, err := evalengine.Translate(sqlparser.AndExpressions(f.Predicates...), cfg)
predicate := sqlparser.AndExpressions(f.Predicates...)
rewritten, err := useOffsets(ctx, predicate, f)
if err != nil {
return err
}
eexpr, err := evalengine.Translate(rewritten, cfg)
if err != nil {
if strings.HasPrefix(err.Error(), evalengine.ErrTranslateExprNotSupported) {
return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "%s: %s", evalengine.ErrTranslateExprNotSupported, sqlparser.String(predicate))
}
return err
}

f.FinalPredicate = eexpr
return nil
}

func (f *Filter) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Filter",
Other: map[string]any{
"Predicate": sqlparser.String(sqlparser.AndExpressions(f.Predicates...)),
},
}
}

func (f *Filter) ShortDescription() string {
return sqlparser.String(sqlparser.AndExpressions(f.Predicates...))
}
6 changes: 0 additions & 6 deletions go/vt/vtgate/planbuilder/operators/horizon.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,6 @@ func (h *Horizon) setQP(qp *QueryProjection) {
h.QP = qp
}

func (h *Horizon) Description() ops.OpDescription {
return ops.OpDescription{
OperatorType: "Horizon",
}
}

func (h *Horizon) ShortDescription() string {
return ""
}
Loading