Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
595d8d8
initial aggregator operator move to new horizon planner
harshit-gangal Apr 28, 2023
a9918af
added ordering for grouping key in horizon planning
harshit-gangal Apr 28, 2023
329d5b7
gen4 planner: start of push aggregations through joins
systay Apr 29, 2023
11ad8fb
planner feat: split aggregation across joins
systay May 3, 2023
4d0c986
ordering push down on aggregations
harshit-gangal May 3, 2023
7dbaeb7
aggregation grouping based on the user provided ordering
harshit-gangal May 3, 2023
56933ea
gen4 planner refactor: use visitor pattern
systay May 3, 2023
8907650
gen4 test: make TestOne enable operator debug printing
systay May 3, 2023
1c78485
gen4 planner: fix groupby in shortDescription
systay May 3, 2023
9bd77cb
fall back when we are doing distinct aggregations or aggregations on …
systay May 3, 2023
cee9df6
group by visitor returning column index
harshit-gangal May 4, 2023
4f81544
make sure that the projection under the aggregation makes sense
systay May 5, 2023
4e9adcb
feat: keep better track of offsets in aggregation planning
GuptaManan100 May 8, 2023
3db4cb9
feat: add code to figure out grouping can be pushed down in the prese…
GuptaManan100 May 8, 2023
4f66c23
aggregator to add grouping column if not part of select expression an…
harshit-gangal May 11, 2023
85265db
wip planner refactoring: support more aggregations
systay May 12, 2023
94a1dbc
gen4 refactoring: stop compact from going down into the route
systay May 13, 2023
1a06abc
update more test cases
systay May 13, 2023
99c178d
handle aggr functions not supported in vtgate
systay May 13, 2023
ea22160
once under the route, no need to be clever about pushing columns
systay May 13, 2023
95aa81a
make sure to order by the correct expression
systay May 13, 2023
eda0deb
Various operator fixes
systay May 13, 2023
d93d95f
make sure that the old horizon planning still works as before
systay May 13, 2023
c747827
add end2end tests for the new grouping and ordering capabilities
systay May 13, 2023
f03b34d
refactor aggregation handling
systay May 13, 2023
2a8f28d
update test expectations
systay May 13, 2023
6cad2fd
add more aggregation tests
systay May 13, 2023
5f8fe3f
handle min/max in the new aggregation planning
systay May 13, 2023
8f98c76
refactor. clean up. make pretty
systay May 13, 2023
ffe7666
join engine fix: pass bind vars with type value when left side have e…
harshit-gangal May 15, 2023
f6f3de5
projection pushing to use reserved vars to avoid conflicting bind var…
harshit-gangal May 15, 2023
dab64b7
gen4 planner refactoring
systay May 15, 2023
446c4b1
gen4 planner refactoring: clean up logic, add comments
systay May 15, 2023
d97915d
projection not pushed on join when created for aggregation above, fix…
harshit-gangal May 17, 2023
60b8ea3
added new aggregation cases
harshit-gangal May 17, 2023
3a11228
add column to pushed aggregation if it is present in the original top…
harshit-gangal May 17, 2023
27c7c26
adding dummy grouping to right side aggregator when aggregation is pr…
harshit-gangal May 17, 2023
ea0c9c8
remove extra group by; use IF instead of COALESCE
systay May 17, 2023
101734c
projection to contain output columns as aliasExpr, count star multipl…
harshit-gangal May 19, 2023
5cbeb26
make sure to add projection columns for all aggregations
systay May 19, 2023
3d95ce7
add group by on the RHS of split aggregations so we don't get invalid…
systay May 19, 2023
6aa627f
test expectations
systay May 19, 2023
44eb9de
make it possible to push sorting under projection
systay May 19, 2023
aefc508
refactoring
systay May 19, 2023
ca95c24
handle opcode.AggregationRandom in the new operator model
systay May 19, 2023
2cb3b4a
add logging for each operator transformation
systay May 20, 2023
6af4190
test expectation
systay May 20, 2023
d16ec81
handle count on columns
systay May 20, 2023
7e1f905
refactor code and re-use count(*) more aggresively
systay May 20, 2023
cbf18fa
add random e2e testing for aggregations
systay May 20, 2023
9c4cadf
added test with known inconsistencies between mysql and vitess
systay May 20, 2023
caa25d7
refactor to use types
systay May 22, 2023
266ed85
remove failing test and remove unnecessary printing
systay May 22, 2023
282ab7f
added EnableGeneralLog method to enable general logs on all the mysql…
harshit-gangal May 22, 2023
ce8b7ef
print vitess plan when vitess and mysql result does not match
harshit-gangal May 22, 2023
8bb3370
add ORDER BY code to the fuzzer
systay May 22, 2023
c97c627
add group by order by test
harshit-gangal May 22, 2023
06475db
do not pass 0 to rand.Intn func as it causes panic
harshit-gangal May 22, 2023
82b0175
some code refactor and comments
harshit-gangal May 22, 2023
97eff7d
added comments
systay May 24, 2023
566a796
added more comments
systay May 24, 2023
3c1b92c
refactor and comment after review
systay May 24, 2023
fd62d66
add column names to projections
systay May 25, 2023
41235db
Merge remote-tracking branch 'upstream/main' into aggr-op
harshit-gangal May 25, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,3 +1313,19 @@ func (cluster *LocalProcessCluster) EnableVTOrcRecoveries(t *testing.T) {
vtorc.EnableGlobalRecoveries(t)
}
}

// EnableGeneralLog enables generals logs on all the mysql server started by this cluster.
// This method should be used only for local debugging purpose.
func (cluster *LocalProcessCluster) EnableGeneralLog() error {
for _, ks := range cluster.Keyspaces {
for _, shard := range ks.Shards {
for _, vttablet := range shard.Vttablets {
_, err := vttablet.VttabletProcess.QueryTablet("set global general_log = 1", "", false)
if err != nil {
return err
}
}
}
}
return nil
}
6 changes: 3 additions & 3 deletions go/test/endtoend/utils/cmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (mcmp *MySQLCompare) Exec(query string) *sqltypes.Result {

mysqlQr, err := mcmp.MySQLConn.ExecuteFetch(query, 1000, true)
require.NoError(mcmp.t, err, "[MySQL Error] for query: "+query)
compareVitessAndMySQLResults(mcmp.t, query, vtQr, mysqlQr, false)
compareVitessAndMySQLResults(mcmp.t, query, mcmp.VtConn, vtQr, mysqlQr, false)
return vtQr
}

Expand All @@ -222,7 +222,7 @@ func (mcmp *MySQLCompare) ExecWithColumnCompare(query string) *sqltypes.Result {

mysqlQr, err := mcmp.MySQLConn.ExecuteFetch(query, 1000, true)
require.NoError(mcmp.t, err, "[MySQL Error] for query: "+query)
compareVitessAndMySQLResults(mcmp.t, query, vtQr, mysqlQr, true)
compareVitessAndMySQLResults(mcmp.t, query, mcmp.VtConn, vtQr, mysqlQr, true)
return vtQr
}

Expand All @@ -241,7 +241,7 @@ func (mcmp *MySQLCompare) ExecAllowAndCompareError(query string) (*sqltypes.Resu
// Since we allow errors, we don't want to compare results if one of the client failed.
// Vitess and MySQL should always be agreeing whether the query returns an error or not.
if vtErr == nil && mysqlErr == nil {
compareVitessAndMySQLResults(mcmp.t, query, vtQr, mysqlQr, false)
compareVitessAndMySQLResults(mcmp.t, query, mcmp.VtConn, vtQr, mysqlQr, false)
}
return vtQr, vtErr
}
Expand Down
6 changes: 5 additions & 1 deletion go/test/endtoend/utils/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func prepareMySQLWithSchema(params mysql.ConnParams, sql string) error {
return nil
}

func compareVitessAndMySQLResults(t *testing.T, query string, vtQr, mysqlQr *sqltypes.Result, compareColumns bool) {
func compareVitessAndMySQLResults(t *testing.T, query string, vtConn *mysql.Conn, vtQr, mysqlQr *sqltypes.Result, compareColumns bool) {
if vtQr == nil && mysqlQr == nil {
return
}
Expand Down Expand Up @@ -207,6 +207,10 @@ func compareVitessAndMySQLResults(t *testing.T, query string, vtQr, mysqlQr *sql
for _, row := range mysqlQr.Rows {
errStr += fmt.Sprintf("%s\n", row)
}
if vtConn != nil {
qr := Exec(t, vtConn, fmt.Sprintf("vexplain plan %s", query))
errStr += fmt.Sprintf("query plan: \n%s\n", qr.Rows[0][0].ToString())
}
t.Error(errStr)
}

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func ExecCompareMySQL(t *testing.T, vtConn, mysqlConn *mysql.Conn, query string)

mysqlQr, err := mysqlConn.ExecuteFetch(query, 1000, true)
require.NoError(t, err, "[MySQL Error] for query: "+query)
compareVitessAndMySQLResults(t, query, vtQr, mysqlQr, false)
compareVitessAndMySQLResults(t, query, vtConn, vtQr, mysqlQr, false)
return vtQr
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ func TestGroupBy(t *testing.T) {
mcmp.Exec("insert into t3(id5, id6, id7) values(1,1,2), (2,2,4), (3,2,4), (4,1,2), (5,1,2), (6,3,6)")
// test ordering and group by int column
mcmp.AssertMatches("select id6, id7, count(*) k from t3 group by id6, id7 order by k", `[[INT64(3) INT64(6) INT64(1)] [INT64(2) INT64(4) INT64(2)] [INT64(1) INT64(2) INT64(3)]]`)
mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ id6+id7, count(*) k from t3 group by id6+id7 order by k", `[[INT64(9) INT64(1)] [INT64(6) INT64(2)] [INT64(3) INT64(3)]]`)

// Test the same queries in streaming mode
utils.Exec(t, mcmp.VtConn, "set workload = olap")
mcmp.AssertMatches("select id6, id7, count(*) k from t3 group by id6, id7 order by k", `[[INT64(3) INT64(6) INT64(1)] [INT64(2) INT64(4) INT64(2)] [INT64(1) INT64(2) INT64(3)]]`)
mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ id6+id7, count(*) k from t3 group by id6+id7 order by k", `[[INT64(9) INT64(1)] [INT64(6) INT64(2)] [INT64(3) INT64(3)]]`)
}

func TestDistinct(t *testing.T) {
Expand Down
212 changes: 212 additions & 0 deletions go/test/endtoend/vtgate/queries/aggregation/fuzz_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
Copyright 2023 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package aggregation

import (
"fmt"
"math/rand"
"strings"
"testing"
"time"

"golang.org/x/exp/maps"

"vitess.io/vitess/go/vt/log"
)

type (
column struct {
name string
typ string
}
tableT struct {
name string
columns []column
}
)

func TestFuzzAggregations(t *testing.T) {
// This test randomizes values and queries, and checks that mysql returns the same values that Vitess does
mcmp, closer := start(t)
defer closer()

noOfRows := rand.Intn(20)
var values []string
for i := 0; i < noOfRows; i++ {
values = append(values, fmt.Sprintf("(%d, 'name%d', 'value%d', %d)", i, i, i, i))
}
t1Insert := fmt.Sprintf("insert into t1 (t1_id, name, value, shardKey) values %s;", strings.Join(values, ","))
values = nil
noOfRows = rand.Intn(20)
for i := 0; i < noOfRows; i++ {
values = append(values, fmt.Sprintf("(%d, %d)", i, i))
}
t2Insert := fmt.Sprintf("insert into t2 (id, shardKey) values %s;", strings.Join(values, ","))

mcmp.Exec(t1Insert)
mcmp.Exec(t2Insert)

t.Cleanup(func() {
if t.Failed() {
fmt.Println(t1Insert)
fmt.Println(t2Insert)
}
})

schema := map[string]tableT{
"t1": {name: "t1", columns: []column{
{name: "t1_id", typ: "bigint"},
{name: "name", typ: "varchar"},
{name: "value", typ: "varchar"},
{name: "shardKey", typ: "bigint"},
}},
"t2": {name: "t2", columns: []column{
{name: "id", typ: "bigint"},
{name: "shardKey", typ: "bigint"},
}},
}

endBy := time.Now().Add(1 * time.Second)
schemaTables := maps.Values(schema)

var queryCount int
for time.Now().Before(endBy) || t.Failed() {
tables := createTables(schemaTables)
query := randomQuery(tables, 3, 3)
mcmp.Exec(query)
if t.Failed() {
fmt.Println(query)
}
queryCount++
}
log.Info("Queries successfully executed: %d", queryCount)
}

func randomQuery(tables []tableT, maxAggrs, maxGroupBy int) string {
randomCol := func(tblIdx int) (string, string) {
tbl := tables[tblIdx]
col := randomEl(tbl.columns)
return fmt.Sprintf("tbl%d.%s", tblIdx, col.name), col.typ
}
predicates := createPredicates(tables, randomCol)
aggregates := createAggregations(tables, maxAggrs, randomCol)
grouping := createGroupBy(tables, maxGroupBy, randomCol)
sel := "select /*vt+ PLANNER=Gen4 */ " + strings.Join(aggregates, ", ") + " from "

var tbls []string
for i, s := range tables {
tbls = append(tbls, fmt.Sprintf("%s as tbl%d", s.name, i))
}
sel += strings.Join(tbls, ", ")

if len(predicates) > 0 {
sel += " where "
sel += strings.Join(predicates, " and ")
}
if len(grouping) > 0 {
sel += " group by "
sel += strings.Join(grouping, ", ")
}
// we do it this way so we don't have to do only `only_full_group_by` queries
var noOfOrderBy int
if len(grouping) > 0 {
// panic on rand function call if value is 0
noOfOrderBy = rand.Intn(len(grouping))
}
if noOfOrderBy > 0 {
noOfOrderBy = 0 // TODO turning on ORDER BY here causes lots of failures to happen
}
if noOfOrderBy > 0 {
var orderBy []string
for noOfOrderBy > 0 {
noOfOrderBy--
if rand.Intn(2) == 0 || len(grouping) == 0 {
orderBy = append(orderBy, randomEl(aggregates))
} else {
orderBy = append(orderBy, randomEl(grouping))
}
}
sel += " order by "
sel += strings.Join(orderBy, ", ")
}
return sel
}

func createGroupBy(tables []tableT, maxGB int, randomCol func(tblIdx int) (string, string)) (grouping []string) {
noOfGBs := rand.Intn(maxGB)
for i := 0; i < noOfGBs; i++ {
tblIdx := rand.Intn(len(tables))
col, _ := randomCol(tblIdx)
grouping = append(grouping, col)
}
return
}

func createAggregations(tables []tableT, maxAggrs int, randomCol func(tblIdx int) (string, string)) (aggregates []string) {
aggregations := []func(string) string{
func(_ string) string { return "count(*)" },
func(e string) string { return fmt.Sprintf("count(%s)", e) },
//func(e string) string { return fmt.Sprintf("sum(%s)", e) },
//func(e string) string { return fmt.Sprintf("avg(%s)", e) },
//func(e string) string { return fmt.Sprintf("min(%s)", e) },
//func(e string) string { return fmt.Sprintf("max(%s)", e) },
}

noOfAggrs := rand.Intn(maxAggrs) + 1
for i := 0; i < noOfAggrs; i++ {
tblIdx := rand.Intn(len(tables))
e, _ := randomCol(tblIdx)
aggregates = append(aggregates, randomEl(aggregations)(e))
}
return aggregates
}

func createTables(schemaTables []tableT) []tableT {
noOfTables := rand.Intn(2) + 1
var tables []tableT

for i := 0; i < noOfTables; i++ {
tables = append(tables, randomEl(schemaTables))
}
return tables
}

func createPredicates(tables []tableT, randomCol func(tblIdx int) (string, string)) (predicates []string) {
for idx1 := range tables {
for idx2 := range tables {
if idx1 == idx2 {
continue
}
noOfPredicates := rand.Intn(2)

for noOfPredicates > 0 {
col1, t1 := randomCol(idx1)
col2, t2 := randomCol(idx2)
if t1 != t2 {
continue
}
predicates = append(predicates, fmt.Sprintf("%s = %s", col1, col2))
noOfPredicates--
}
}
}
return predicates
}

func randomEl[K any](in []K) K {
return in[rand.Intn(len(in))]
}
3 changes: 3 additions & 0 deletions go/test/endtoend/vtgate/queries/orderby/orderby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ func TestOrderBy(t *testing.T) {
mcmp.AssertMatches("select id1, id2 from t4 order by id2 desc", `[[INT64(5) VARCHAR("test")] [INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(2) VARCHAR("Abc")] [INT64(1) VARCHAR("a")]]`)
// test ordering of int column
mcmp.AssertMatches("select id1, id2 from t4 order by id1 desc", `[[INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(5) VARCHAR("test")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(2) VARCHAR("Abc")] [INT64(1) VARCHAR("a")]]`)
// test ordering of complex column
mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ id1, id2 from t4 order by reverse(id2) desc", `[[INT64(5) VARCHAR("test")] [INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(2) VARCHAR("Abc")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(1) VARCHAR("a")]]`)

defer func() {
utils.Exec(t, mcmp.VtConn, "set workload = oltp")
Expand All @@ -75,4 +77,5 @@ func TestOrderBy(t *testing.T) {
utils.Exec(t, mcmp.VtConn, "set workload = olap")
mcmp.AssertMatches("select id1, id2 from t4 order by id2 desc", `[[INT64(5) VARCHAR("test")] [INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(2) VARCHAR("Abc")] [INT64(1) VARCHAR("a")]]`)
mcmp.AssertMatches("select id1, id2 from t4 order by id1 desc", `[[INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(5) VARCHAR("test")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(2) VARCHAR("Abc")] [INT64(1) VARCHAR("a")]]`)
mcmp.AssertMatches("select /*vt+ PLANNER=Gen4 */ id1, id2 from t4 order by reverse(id2) desc", `[[INT64(5) VARCHAR("test")] [INT64(8) VARCHAR("F")] [INT64(7) VARCHAR("e")] [INT64(6) VARCHAR("d")] [INT64(2) VARCHAR("Abc")] [INT64(4) VARCHAR("c")] [INT64(3) VARCHAR("b")] [INT64(1) VARCHAR("a")]]`)
}
27 changes: 27 additions & 0 deletions go/vt/sqlparser/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -2867,6 +2867,33 @@ type (
}

CountStar struct {
_ bool
// TL;DR; This makes sure that reference equality checks works as expected
//
// You're correct that this might seem a bit strange at first glance.
// It's a quirk of Go's handling of empty structs. In Go, two instances of an empty struct are considered
// identical, which can be problematic when using these as keys in maps.
// They would be treated as the same key and potentially lead to incorrect map behavior.
//
// Here's a brief example:
//
// ```golang
// func TestWeirdGo(t *testing.T) {
// type CountStar struct{}
//
// cs1 := &CountStar{}
// cs2 := &CountStar{}
// if cs1 == cs2 {
// panic("what the what!?")
// }
// }
// ```
//
// In the above code, cs1 and cs2, despite being distinct variables, would be treated as the same object.
//
// The solution we employed was to add a dummy field `_ bool` to the otherwise empty struct `CountStar`.
// This ensures that each instance of `CountStar` is treated as a separate object,
// even in the context of out semantic state which uses these objects as map keys.
}

Avg struct {
Expand Down
5 changes: 5 additions & 0 deletions go/vt/sqlparser/ast_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,11 @@ func writeEscapedString(buf *TrackedBuffer, original string) {
buf.WriteByte('`')
}

func CompliantString(in SQLNode) string {
s := String(in)
return compliantName(s)
}

func compliantName(in string) string {
var buf strings.Builder
for i, c := range in {
Expand Down
10 changes: 7 additions & 3 deletions go/vt/sqlparser/ast_rewriting.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,15 @@ func (r *ReservedVars) ReserveAll(names ...string) bool {
// with the same name already exists, it'll be suffixed with a numberic identifier
// to make it unique.
func (r *ReservedVars) ReserveColName(col *ColName) string {
compliantName := col.CompliantName()
if r.fast && strings.HasPrefix(compliantName, r.prefix) {
compliantName = "_" + compliantName
reserveName := col.CompliantName()
if r.fast && strings.HasPrefix(reserveName, r.prefix) {
reserveName = "_" + reserveName
}

return r.ReserveVariable(reserveName)
}

func (r *ReservedVars) ReserveVariable(compliantName string) string {
joinVar := []byte(compliantName)
baseLen := len(joinVar)
i := int64(1)
Expand Down
10 changes: 10 additions & 0 deletions go/vt/sqlparser/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading