From 6d8adac038a7a27e089c88e1460490d3049036e1 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 10 Jul 2023 14:02:01 +0530 Subject: [PATCH 1/7] push down distinct aggregation on join Signed-off-by: Harshit Gangal --- .../queries/aggregation/aggregation_test.go | 16 ++++ .../operators/aggregation_pushing.go | 9 ++- .../planbuilder/testdata/aggr_cases.json | 73 ++++++++++++++++--- 3 files changed, 85 insertions(+), 13 deletions(-) diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index c7bc80c325b..79928040229 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -528,3 +528,19 @@ func compareRow(t *testing.T, mRes *sqltypes.Result, vtRes *sqltypes.Result, grp require.True(t, foundKey, "mysql and vitess result does not same row: vitess:%v, mysql:%v", vtRes.Rows, mRes.Rows) } } + +func TestDistinctAggregation(t *testing.T) { + mcmp, closer := start(t) + defer closer() + mcmp.Exec("insert into t1(t1_id, `name`, `value`, shardkey) values(1,'a1','foo',100), (2,'b1','foo',200), (3,'c1','foo',300), (4,'a1','foo',100), (5,'d1','toto',200), (6,'c1','tata',893), (7,'a1','titi',2380), (8,'b1','tete',12833), (9,'e1','yoyo',783493)") + + for _, query := range []string{ + `SELECT /*vt+ PLANNER=gen4 */ COUNT(DISTINCT value), SUM(DISTINCT shardkey) FROM t1`, + `SELECT /*vt+ PLANNER=gen4 */ a.t1_id, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.t1_id`, + `SELECT /*vt+ PLANNER=gen4 */ a.value, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.value`, + } { + t.Run(query, func(t *testing.T) { + mcmp.Exec(query) + }) + } +} diff --git a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go index cb506aac959..80aba9ca1f3 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go +++ b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go @@ -70,8 +70,8 @@ func (a *Aggregator) aggregateTheAggregates() { func aggregateTheAggregate(a *Aggregator, i int) { aggr := a.Aggregations[i] switch aggr.OpCode { - case opcode.AggregateCount, opcode.AggregateCountStar, opcode.AggregateCountDistinct: - // All count variations turn into SUM above the Route. + case opcode.AggregateCount, opcode.AggregateCountStar, opcode.AggregateCountDistinct, opcode.AggregateSumDistinct: + // All count variations turn into SUM above the Route. This is also applied for Sum distinct when it is pushed down. // Think of it as we are SUMming together a bunch of distributed COUNTs. aggr.OriginalOpCode, aggr.OpCode = aggr.OpCode, opcode.AggregateSum a.Aggregations[i] = aggr @@ -509,7 +509,10 @@ func (ab *aggBuilder) handleAggr(ctx *plancontext.PlanningContext, aggr Aggr) er // this is only used for SHOW GTID queries that will never contain joins return vterrors.VT13001("cannot do join with vgtid") case opcode.AggregateSumDistinct, opcode.AggregateCountDistinct: - return errAbortAggrPushing + if !exprHasUniqueVindex(ctx, aggr.Func.GetArg()) { + return errAbortAggrPushing + } + return ab.handlePushThroughAggregation(ctx, aggr) default: return errHorizonNotPlanned() } diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index aa54fa01776..c7462522ce0 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -579,15 +579,15 @@ } }, { - "comment": "count with distinct unique vindex", - "query": "select col, count(distinct id) from user group by col", + "comment": "count and sum with distinct unique vindex", + "query": "select col, count(distinct id), sum(distinct id) from user group by col", "v3-plan": { "QueryType": "SELECT", - "Original": "select col, count(distinct id) from user group by col", + "Original": "select col, count(distinct id), sum(distinct id) from user group by col", "Instructions": { "OperatorType": "Aggregate", "Variant": "Ordered", - "Aggregates": "sum_count(1) AS count", + "Aggregates": "sum_count(1) AS count, sum(2)", "GroupBy": "0", "Inputs": [ { @@ -597,9 +597,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col, count(distinct id) from `user` where 1 != 1 group by col", + "FieldQuery": "select col, count(distinct id), sum(distinct id) from `user` where 1 != 1 group by col", "OrderBy": "0 ASC", - "Query": "select col, count(distinct id) from `user` group by col order by col asc", + "Query": "select col, count(distinct id), sum(distinct id) from `user` group by col order by col asc", "Table": "`user`" } ] @@ -607,11 +607,11 @@ }, "gen4-plan": { "QueryType": "SELECT", - "Original": "select col, count(distinct id) from user group by col", + "Original": "select col, count(distinct id), sum(distinct id) from user group by col", "Instructions": { "OperatorType": "Aggregate", "Variant": "Ordered", - "Aggregates": "sum_count_distinct(1) AS count(distinct id)", + "Aggregates": "sum_count_distinct(1) AS count(distinct id), sum_sum_distinct(2) AS sum(distinct id)", "GroupBy": "0", "Inputs": [ { @@ -621,9 +621,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col, count(distinct id) from `user` where 1 != 1 group by col", + "FieldQuery": "select col, count(distinct id), sum(distinct id) from `user` where 1 != 1 group by col", "OrderBy": "0 ASC", - "Query": "select col, count(distinct id) from `user` group by col order by col asc", + "Query": "select col, count(distinct id), sum(distinct id) from `user` group by col order by col asc", "Table": "`user`" } ] @@ -7044,5 +7044,58 @@ "user.user" ] } + }, + { + "comment": "count distinct and sum distinct on join query pushed down - unique vindex", + "query": "select u.col1, count(distinct m.user_id), sum(distinct m.user_id) from user u join music m group by u.col1", + "v3-plan": "VT12001: unsupported: cross-shard query with aggregates", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select u.col1, count(distinct m.user_id), sum(distinct m.user_id) from user u join music m group by u.col1", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "sum_count_distinct(1) AS count(distinct m.user_id), sum_sum_distinct(2) AS sum(distinct m.user_id)", + "GroupBy": "(0|3)", + "ResultColumns": 3, + "Inputs": [ + { + "OperatorType": "Join", + "Variant": "Join", + "JoinColumnIndexes": "L:0,R:0,R:1,L:1", + "TableName": "`user`_music", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select u.col1, weight_string(u.col1) from `user` as u where 1 != 1 group by u.col1, weight_string(u.col1)", + "OrderBy": "(0|1) ASC", + "Query": "select u.col1, weight_string(u.col1) from `user` as u group by u.col1, weight_string(u.col1) order by u.col1 asc", + "Table": "`user`" + }, + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select count(distinct m.user_id), sum(distinct m.user_id) from music as m where 1 != 1 group by .0", + "Query": "select count(distinct m.user_id), sum(distinct m.user_id) from music as m group by .0", + "Table": "music" + } + ] + } + ] + }, + "TablesUsed": [ + "user.music", + "user.user" + ] + } } ] From deb4d8637601cc6c5e859ddb374f23ae071d7369 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Mon, 10 Jul 2023 11:33:09 +0200 Subject: [PATCH 2/7] fail the correct test Signed-off-by: Andres Taylor --- go/test/endtoend/utils/cmp.go | 11 +++++++++++ .../vtgate/queries/aggregation/aggregation_test.go | 2 +- 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/go/test/endtoend/utils/cmp.go b/go/test/endtoend/utils/cmp.go index bf66fc6ef2b..f0267c55fe6 100644 --- a/go/test/endtoend/utils/cmp.go +++ b/go/test/endtoend/utils/cmp.go @@ -256,3 +256,14 @@ func (mcmp *MySQLCompare) ExecAndIgnore(query string) (*sqltypes.Result, error) _, _ = mcmp.MySQLConn.ExecuteFetch(query, 1000, true) return mcmp.VtConn.ExecuteFetch(query, 1000, true) } + +func (mcmp *MySQLCompare) Run(query string, f func(mcmp *MySQLCompare)) { + mcmp.t.Run(query, func(t *testing.T) { + inner := &MySQLCompare{ + t: t, + MySQLConn: mcmp.MySQLConn, + VtConn: mcmp.VtConn, + } + f(inner) + }) +} diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index 79928040229..a26f7d08eba 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -539,7 +539,7 @@ func TestDistinctAggregation(t *testing.T) { `SELECT /*vt+ PLANNER=gen4 */ a.t1_id, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.t1_id`, `SELECT /*vt+ PLANNER=gen4 */ a.value, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.value`, } { - t.Run(query, func(t *testing.T) { + mcmp.Run(query, func(mcmp *utils.MySQLCompare) { mcmp.Exec(query) }) } From 254e401623a908cba7c5eca8e6a0faf507f5a153 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 10 Jul 2023 15:58:48 +0530 Subject: [PATCH 3/7] added the failing unit test Signed-off-by: Harshit Gangal --- .../vtgate/engine/scalar_aggregation_test.go | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/go/vt/vtgate/engine/scalar_aggregation_test.go b/go/vt/vtgate/engine/scalar_aggregation_test.go index 033d69c9271..810d67f2b53 100644 --- a/go/vt/vtgate/engine/scalar_aggregation_test.go +++ b/go/vt/vtgate/engine/scalar_aggregation_test.go @@ -24,6 +24,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql/collations" + "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/sqltypes" @@ -255,6 +257,48 @@ func TestScalarGroupConcatWithAggrOnEngine(t *testing.T) { } } +// TestScalarDistinctAggr tests distinct aggregation on engine. +func TestScalarDistinctAggr(t *testing.T) { + fields := sqltypes.MakeTestFields( + "value|sum(distinct shardkey)", + "varchar|decimal", + ) + + fp := &fakePrimitive{results: []*sqltypes.Result{sqltypes.MakeTestResult( + fields, + "foo|600", + "tata|893", + "tete|12833", + "titi|2380", + "toto|200", + "yoyo|783493", + )}} + param := NewAggregateParam(AggregateCountDistinct, 0, "count(distinct value)") + param.CollationID = collations.CollationUtf8mb4ID + + param2 := NewAggregateParam(AggregateSum, 1, "sum(distinct sharkey)") + param2.OrigOpcode = AggregateSumDistinct + oa := &ScalarAggregate{ + Aggregates: []*AggregateParams{param, param2}, + Input: fp, + } + qr, err := oa.TryExecute(context.Background(), &noopVCursor{}, nil, false) + require.NoError(t, err) + require.Equal(t, `[INT64(6) DECIMAL(800199)]`, fmt.Sprintf("%v", qr.Rows)) + + fp.rewind() + results := &sqltypes.Result{} + err = oa.TryStreamExecute(context.Background(), &noopVCursor{}, nil, true, func(qr *sqltypes.Result) error { + if qr.Fields != nil { + results.Fields = qr.Fields + } + results.Rows = append(results.Rows, qr.Rows...) + return nil + }) + require.NoError(t, err) + require.Equal(t, `[INT64(6) DECIMAL(800199)]`, fmt.Sprintf("%v", results.Rows)) +} + // TestScalarGroupConcat tests group_concat with partial aggregation on engine. func TestScalarGroupConcat(t *testing.T) { fields := sqltypes.MakeTestFields( From 83a2394e7e2094ce39256d31ce7cceb218f66703 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 11 Jul 2023 00:35:35 +0530 Subject: [PATCH 4/7] fix aggregation to not push partial distinct - all or none Signed-off-by: Harshit Gangal --- .../queries/aggregation/aggregation_test.go | 4 +- .../vtgate/engine/scalar_aggregation_test.go | 79 ++++++++++++---- .../operators/aggregation_pushing.go | 89 +++++++++++++------ .../planbuilder/operators/aggregator.go | 4 +- .../planbuilder/testdata/aggr_cases.json | 4 +- 5 files changed, 132 insertions(+), 48 deletions(-) diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index a26f7d08eba..b3d96970b43 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -535,9 +535,11 @@ func TestDistinctAggregation(t *testing.T) { mcmp.Exec("insert into t1(t1_id, `name`, `value`, shardkey) values(1,'a1','foo',100), (2,'b1','foo',200), (3,'c1','foo',300), (4,'a1','foo',100), (5,'d1','toto',200), (6,'c1','tata',893), (7,'a1','titi',2380), (8,'b1','tete',12833), (9,'e1','yoyo',783493)") for _, query := range []string{ - `SELECT /*vt+ PLANNER=gen4 */ COUNT(DISTINCT value), SUM(DISTINCT shardkey) FROM t1`, + // `SELECT /*vt+ PLANNER=gen4 */ COUNT(DISTINCT value), SUM(DISTINCT shardkey) FROM t1`, - fails as different distinct expression. `SELECT /*vt+ PLANNER=gen4 */ a.t1_id, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.t1_id`, `SELECT /*vt+ PLANNER=gen4 */ a.value, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.value`, + // `SELECT /*vt+ PLANNER=gen4 */ count(distinct a.value), SUM(DISTINCT b.t1_id) FROM t1 a, t1 b`, - fails as different distinct expression. + `SELECT /*vt+ PLANNER=gen4 */ a.value, SUM(DISTINCT b.t1_id) FROM t1 a, t1 b group by a.value`, } { mcmp.Run(query, func(mcmp *utils.MySQLCompare) { mcmp.Exec(query) diff --git a/go/vt/vtgate/engine/scalar_aggregation_test.go b/go/vt/vtgate/engine/scalar_aggregation_test.go index 810d67f2b53..2dfc5b10763 100644 --- a/go/vt/vtgate/engine/scalar_aggregation_test.go +++ b/go/vt/vtgate/engine/scalar_aggregation_test.go @@ -24,8 +24,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/mysql/collations" - "vitess.io/vitess/go/test/utils" "vitess.io/vitess/go/sqltypes" @@ -258,33 +256,76 @@ func TestScalarGroupConcatWithAggrOnEngine(t *testing.T) { } // TestScalarDistinctAggr tests distinct aggregation on engine. -func TestScalarDistinctAggr(t *testing.T) { +func TestScalarDistinctAggrOnEngine(t *testing.T) { + fields := sqltypes.MakeTestFields( + "value|value", + "int64|int64", + ) + + fp := &fakePrimitive{results: []*sqltypes.Result{sqltypes.MakeTestResult( + fields, + "100|100", + "200|200", + "200|200", + "400|400", + "400|400", + "600|600", + )}} + + oa := &ScalarAggregate{ + Aggregates: []*AggregateParams{ + NewAggregateParam(AggregateCountDistinct, 0, "count(distinct value)"), + NewAggregateParam(AggregateSumDistinct, 1, "sum(distinct value)"), + }, + Input: fp, + } + qr, err := oa.TryExecute(context.Background(), &noopVCursor{}, nil, false) + require.NoError(t, err) + require.Equal(t, `[[INT64(4) DECIMAL(1300)]]`, fmt.Sprintf("%v", qr.Rows)) + + fp.rewind() + results := &sqltypes.Result{} + err = oa.TryStreamExecute(context.Background(), &noopVCursor{}, nil, true, func(qr *sqltypes.Result) error { + if qr.Fields != nil { + results.Fields = qr.Fields + } + results.Rows = append(results.Rows, qr.Rows...) + return nil + }) + require.NoError(t, err) + require.Equal(t, `[[INT64(4) DECIMAL(1300)]]`, fmt.Sprintf("%v", results.Rows)) +} + +func TestScalarDistinctPushedDown(t *testing.T) { fields := sqltypes.MakeTestFields( - "value|sum(distinct shardkey)", - "varchar|decimal", + "count(distinct value)|sum(distinct value)", + "int64|decimal", ) fp := &fakePrimitive{results: []*sqltypes.Result{sqltypes.MakeTestResult( fields, - "foo|600", - "tata|893", - "tete|12833", - "titi|2380", - "toto|200", - "yoyo|783493", + "2|200", + "6|400", + "3|700", + "1|10", + "7|30", + "8|90", )}} - param := NewAggregateParam(AggregateCountDistinct, 0, "count(distinct value)") - param.CollationID = collations.CollationUtf8mb4ID - param2 := NewAggregateParam(AggregateSum, 1, "sum(distinct sharkey)") - param2.OrigOpcode = AggregateSumDistinct + countAggr := NewAggregateParam(AggregateSum, 0, "count(distinct value)") + countAggr.OrigOpcode = AggregateCountDistinct + sumAggr := NewAggregateParam(AggregateSum, 1, "sum(distinct value)") + sumAggr.OrigOpcode = AggregateSumDistinct oa := &ScalarAggregate{ - Aggregates: []*AggregateParams{param, param2}, - Input: fp, + Aggregates: []*AggregateParams{ + countAggr, + sumAggr, + }, + Input: fp, } qr, err := oa.TryExecute(context.Background(), &noopVCursor{}, nil, false) require.NoError(t, err) - require.Equal(t, `[INT64(6) DECIMAL(800199)]`, fmt.Sprintf("%v", qr.Rows)) + require.Equal(t, `[[INT64(27) DECIMAL(1430)]]`, fmt.Sprintf("%v", qr.Rows)) fp.rewind() results := &sqltypes.Result{} @@ -296,7 +337,7 @@ func TestScalarDistinctAggr(t *testing.T) { return nil }) require.NoError(t, err) - require.Equal(t, `[INT64(6) DECIMAL(800199)]`, fmt.Sprintf("%v", results.Rows)) + require.Equal(t, `[[INT64(27) DECIMAL(1430)]]`, fmt.Sprintf("%v", results.Rows)) } // TestScalarGroupConcat tests group_concat with partial aggregation on engine. diff --git a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go index 80aba9ca1f3..3a67a4fc2a2 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go +++ b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go @@ -115,35 +115,68 @@ func pushDownAggregationThroughRoute( // pushDownAggregations splits aggregations between the original aggregator and the one we are pushing down func pushDownAggregations(ctx *plancontext.PlanningContext, aggregator *Aggregator, aggrBelowRoute *Aggregator) error { - for i, aggregation := range aggregator.Aggregations { - if !aggregation.Distinct || exprHasUniqueVindex(ctx, aggregation.Func.GetArg()) { - aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggregation) - aggregateTheAggregate(aggregator, i) + canPushDownDistinct, distinctExpr, err := checkIfWeCanPushDown(ctx, aggregator) + if err != nil { + return err + } + + if !canPushDownDistinct { + aggregator.DistinctExpr = distinctExpr + } + + aeDistinctExpr := aeWrap(aggregator.DistinctExpr) + offset := -1 + for i, aggr := range aggregator.Aggregations { + if aggr.Distinct && !canPushDownDistinct { + offset = aggr.ColOffset + aggrBelowRoute.Columns[offset] = aeDistinctExpr continue } - innerExpr := aggregation.Func.GetArg() + aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggr) + aggregateTheAggregate(aggregator, i) + } - if aggregator.DistinctExpr != nil { - if ctx.SemTable.EqualsExpr(aggregator.DistinctExpr, innerExpr) { - // we can handle multiple distinct aggregations, as long as they are aggregating on the same expression - aggrBelowRoute.Columns[aggregation.ColOffset] = aeWrap(innerExpr) - continue - } - return vterrors.VT12001(fmt.Sprintf("only one DISTINCT aggregation is allowed in a SELECT: %s", sqlparser.String(aggregation.Original))) - } + // everything is pushed below the route. + if canPushDownDistinct { + return nil + } - // We handle a distinct aggregation by turning it into a group by and - // doing the aggregating on the vtgate level instead - aggregator.DistinctExpr = innerExpr - aeDistinctExpr := aeWrap(aggregator.DistinctExpr) + // We handle a distinct aggregation by turning it into a group by and + // doing the aggregating on the vtgate level instead + // Adding to group by can be done only once even though there are multiple distinct aggregation with same expression. + groupBy := NewGroupBy(aggregator.DistinctExpr, aggregator.DistinctExpr, aeDistinctExpr) + groupBy.ColOffset = offset + aggrBelowRoute.Grouping = append(aggrBelowRoute.Grouping, groupBy) - aggrBelowRoute.Columns[aggregation.ColOffset] = aeDistinctExpr + return nil +} + +func checkIfWeCanPushDown(ctx *plancontext.PlanningContext, aggregator *Aggregator) (bool, sqlparser.Expr, error) { + canPushDown := true + var distinctExpr sqlparser.Expr + var differentExpr *sqlparser.AliasedExpr - groupBy := NewGroupBy(aggregator.DistinctExpr, aggregator.DistinctExpr, aeDistinctExpr) - groupBy.ColOffset = aggregation.ColOffset - aggrBelowRoute.Grouping = append(aggrBelowRoute.Grouping, groupBy) + for _, aggr := range aggregator.Aggregations { + if !aggr.Distinct { + continue + } + innerExpr := aggr.Func.GetArg() + if !exprHasUniqueVindex(ctx, innerExpr) { + canPushDown = false + } + if distinctExpr == nil { + distinctExpr = innerExpr + } + if !ctx.SemTable.EqualsExpr(distinctExpr, innerExpr) { + differentExpr = aggr.Original + } } - return nil + + if !canPushDown && differentExpr != nil { + return false, nil, vterrors.VT12001(fmt.Sprintf("only one DISTINCT aggregation is allowed in a SELECT: %s", sqlparser.String(differentExpr))) + } + + return canPushDown, distinctExpr, nil } func pushDownAggregationThroughFilter( @@ -411,6 +444,15 @@ func splitAggrColumnsToLeftAndRight( outerJoin: join.LeftJoin, } + canPushDownDistinct, distinctExpr, err := checkIfWeCanPushDown(ctx, aggregator) + if err != nil { + return nil, nil, err + } + if !canPushDownDistinct { + aggregator.DistinctExpr = distinctExpr + return nil, nil, errAbortAggrPushing + } + outer: // we prefer adding the aggregations in the same order as the columns are declared for colIdx, col := range aggregator.Columns { @@ -509,9 +551,6 @@ func (ab *aggBuilder) handleAggr(ctx *plancontext.PlanningContext, aggr Aggr) er // this is only used for SHOW GTID queries that will never contain joins return vterrors.VT13001("cannot do join with vgtid") case opcode.AggregateSumDistinct, opcode.AggregateCountDistinct: - if !exprHasUniqueVindex(ctx, aggr.Func.GetArg()) { - return errAbortAggrPushing - } return ab.handlePushThroughAggregation(ctx, aggr) default: return errHorizonNotPlanned() diff --git a/go/vt/vtgate/planbuilder/operators/aggregator.go b/go/vt/vtgate/planbuilder/operators/aggregator.go index 36602e024d4..ce6b9cc1912 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregator.go +++ b/go/vt/vtgate/planbuilder/operators/aggregator.go @@ -42,7 +42,9 @@ type ( Grouping []GroupBy Aggregations []Aggr - // We support a single distinct aggregation per aggregator. It is stored here + // We support a single distinct aggregation per aggregator. It is stored here. + // When planning the ordering that the OrderedAggregate will require, + // this needs to be the last ORDER BY expression DistinctExpr sqlparser.Expr // Pushed will be set to true once this aggregation has been pushed deeper in the tree diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index c7462522ce0..0dbbe645080 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -3854,8 +3854,8 @@ "Sharded": true }, "FieldQuery": "select u.textcol1, u.val2, weight_string(u.val2) from `user` as u where 1 != 1", - "OrderBy": "0 ASC COLLATE latin1_swedish_ci", - "Query": "select u.textcol1, u.val2, weight_string(u.val2) from `user` as u order by u.textcol1 asc", + "OrderBy": "0 ASC COLLATE latin1_swedish_ci, (1|2) ASC", + "Query": "select u.textcol1, u.val2, weight_string(u.val2) from `user` as u order by u.textcol1 asc, u.val2 asc", "Table": "`user`" }, { From 8694c9992a3fc11a3cbec174ab18969d2cfbe5a8 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 11 Jul 2023 22:46:57 +0530 Subject: [PATCH 5/7] handle distinct aggregation pushing better with min/max having distinct Signed-off-by: Harshit Gangal --- go/test/endtoend/utils/cmp.go | 19 ++++ .../queries/aggregation/aggregation_test.go | 34 +++++-- .../vtgate/engine/scalar_aggregation_test.go | 3 +- .../operators/aggregation_pushing.go | 89 +++++++++++++------ .../planbuilder/testdata/aggr_cases.json | 6 +- 5 files changed, 111 insertions(+), 40 deletions(-) diff --git a/go/test/endtoend/utils/cmp.go b/go/test/endtoend/utils/cmp.go index f0267c55fe6..38726d6c3aa 100644 --- a/go/test/endtoend/utils/cmp.go +++ b/go/test/endtoend/utils/cmp.go @@ -267,3 +267,22 @@ func (mcmp *MySQLCompare) Run(query string, f func(mcmp *MySQLCompare)) { f(inner) }) } + +// ExecAllowError executes the query against both Vitess and MySQL. +// If there is no error, it compares the result +// Return any Vitess execution error without comparing the results. +func (mcmp *MySQLCompare) ExecAllowError(query string) (*sqltypes.Result, error) { + mcmp.t.Helper() + vtQr, vtErr := mcmp.VtConn.ExecuteFetch(query, 1000, true) + if vtErr != nil { + return nil, vtErr + } + mysqlQr, mysqlErr := mcmp.MySQLConn.ExecuteFetch(query, 1000, true) + + // Since we allow errors, we don't want to compare results if one of the client failed. + // Vitess and MySQL should always be agreeing whether the query returns an error or not. + if mysqlErr == nil { + vtErr = compareVitessAndMySQLResults(mcmp.t, query, mcmp.VtConn, vtQr, mysqlQr, false) + } + return vtQr, vtErr +} diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index b3d96970b43..a51fd2aa24b 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -534,15 +534,31 @@ func TestDistinctAggregation(t *testing.T) { defer closer() mcmp.Exec("insert into t1(t1_id, `name`, `value`, shardkey) values(1,'a1','foo',100), (2,'b1','foo',200), (3,'c1','foo',300), (4,'a1','foo',100), (5,'d1','toto',200), (6,'c1','tata',893), (7,'a1','titi',2380), (8,'b1','tete',12833), (9,'e1','yoyo',783493)") - for _, query := range []string{ - // `SELECT /*vt+ PLANNER=gen4 */ COUNT(DISTINCT value), SUM(DISTINCT shardkey) FROM t1`, - fails as different distinct expression. - `SELECT /*vt+ PLANNER=gen4 */ a.t1_id, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.t1_id`, - `SELECT /*vt+ PLANNER=gen4 */ a.value, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.value`, - // `SELECT /*vt+ PLANNER=gen4 */ count(distinct a.value), SUM(DISTINCT b.t1_id) FROM t1 a, t1 b`, - fails as different distinct expression. - `SELECT /*vt+ PLANNER=gen4 */ a.value, SUM(DISTINCT b.t1_id) FROM t1 a, t1 b group by a.value`, - } { - mcmp.Run(query, func(mcmp *utils.MySQLCompare) { - mcmp.Exec(query) + tcases := []struct { + query string + expectedErr string + }{{ + query: `SELECT /*vt+ PLANNER=gen4 */ COUNT(DISTINCT value), SUM(DISTINCT shardkey) FROM t1`, + expectedErr: "VT12001: unsupported: only one DISTINCT aggregation is allowed in a SELECT: sum(distinct shardkey) (errno 1235) (sqlstate 42000)", + }, { + query: `SELECT /*vt+ PLANNER=gen4 */ a.t1_id, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.t1_id`, + }, { + query: `SELECT /*vt+ PLANNER=gen4 */ a.value, SUM(DISTINCT b.shardkey) FROM t1 a, t1 b group by a.value`, + }, { + query: `SELECT /*vt+ PLANNER=gen4 */ count(distinct a.value), SUM(DISTINCT b.t1_id) FROM t1 a, t1 b`, + expectedErr: "VT12001: unsupported: only one DISTINCT aggregation is allowed in a SELECT: sum(distinct b.t1_id) (errno 1235) (sqlstate 42000)", + }, { + query: `SELECT /*vt+ PLANNER=gen4 */ a.value, SUM(DISTINCT b.t1_id), min(DISTINCT a.t1_id) FROM t1 a, t1 b group by a.value`, + }} + + for _, tc := range tcases { + mcmp.Run(tc.query, func(mcmp *utils.MySQLCompare) { + _, err := mcmp.ExecAllowError(tc.query) + if tc.expectedErr == "" { + require.NoError(t, err) + return + } + require.ErrorContains(t, err, tc.expectedErr) }) } } diff --git a/go/vt/vtgate/engine/scalar_aggregation_test.go b/go/vt/vtgate/engine/scalar_aggregation_test.go index 2dfc5b10763..3329fc72d39 100644 --- a/go/vt/vtgate/engine/scalar_aggregation_test.go +++ b/go/vt/vtgate/engine/scalar_aggregation_test.go @@ -24,9 +24,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/test/utils" - "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/utils" . "vitess.io/vitess/go/vt/vtgate/engine/opcode" ) diff --git a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go index 3a67a4fc2a2..dea4d39f667 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go +++ b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go @@ -115,38 +115,66 @@ func pushDownAggregationThroughRoute( // pushDownAggregations splits aggregations between the original aggregator and the one we are pushing down func pushDownAggregations(ctx *plancontext.PlanningContext, aggregator *Aggregator, aggrBelowRoute *Aggregator) error { - canPushDownDistinct, distinctExpr, err := checkIfWeCanPushDown(ctx, aggregator) - if err != nil { - return err - } + var distinctExpr sqlparser.Expr + var differentExpr *sqlparser.AliasedExpr - if !canPushDownDistinct { - aggregator.DistinctExpr = distinctExpr - } + distinctAggrPushedDown := true + distinctAggrGroupByAdded := false - aeDistinctExpr := aeWrap(aggregator.DistinctExpr) - offset := -1 +outer: for i, aggr := range aggregator.Aggregations { - if aggr.Distinct && !canPushDownDistinct { - offset = aggr.ColOffset - aggrBelowRoute.Columns[offset] = aeDistinctExpr + if !aggr.Distinct { + aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggr) + aggregateTheAggregate(aggregator, i) continue } - aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggr) - aggregateTheAggregate(aggregator, i) - } + funcExpr := aggr.Func.GetArg() + switch aggr.OpCode { + case opcode.AggregateCountDistinct, opcode.AggregateSumDistinct: + if !exprHasUniqueVindex(ctx, funcExpr) { + distinctAggrPushedDown = false + } + if distinctExpr == nil { + distinctExpr = funcExpr + break + } + if !ctx.SemTable.EqualsExpr(distinctExpr, funcExpr) { + differentExpr = aggr.Original + } + if differentExpr != nil && !distinctAggrPushedDown { + return vterrors.VT12001(fmt.Sprintf("only one DISTINCT aggregation is allowed in a SELECT: %s", sqlparser.String(differentExpr))) + } + default: + aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggr) + aggregateTheAggregate(aggregator, i) + continue outer + } - // everything is pushed below the route. - if canPushDownDistinct { - return nil + if distinctAggrPushedDown { + aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggr) + aggregateTheAggregate(aggregator, i) + continue + } + + // We handle a distinct aggregation by turning it into a group by and + // doing the aggregating on the vtgate level instead + aeDistinctExpr := aeWrap(funcExpr) + aggrBelowRoute.Columns[aggr.ColOffset] = aeDistinctExpr + + // We handle a distinct aggregation by turning it into a group by and + // doing the aggregating on the vtgate level instead + // Adding to group by can be done only once even though there are multiple distinct aggregation with same expression. + if !distinctAggrGroupByAdded { + groupBy := NewGroupBy(funcExpr, funcExpr, aeDistinctExpr) + groupBy.ColOffset = aggr.ColOffset + aggrBelowRoute.Grouping = append(aggrBelowRoute.Grouping, groupBy) + distinctAggrGroupByAdded = true + } } - // We handle a distinct aggregation by turning it into a group by and - // doing the aggregating on the vtgate level instead - // Adding to group by can be done only once even though there are multiple distinct aggregation with same expression. - groupBy := NewGroupBy(aggregator.DistinctExpr, aggregator.DistinctExpr, aeDistinctExpr) - groupBy.ColOffset = offset - aggrBelowRoute.Grouping = append(aggrBelowRoute.Grouping, groupBy) + if !distinctAggrPushedDown { + aggregator.DistinctExpr = distinctExpr + } return nil } @@ -160,6 +188,13 @@ func checkIfWeCanPushDown(ctx *plancontext.PlanningContext, aggregator *Aggregat if !aggr.Distinct { continue } + + // MIN and MAX function can also have distinct expression in the function argument. + // So, they need to be skipped. + if aggr.OpCode != opcode.AggregateCountDistinct && aggr.OpCode != opcode.AggregateSumDistinct { + continue + } + innerExpr := aggr.Func.GetArg() if !exprHasUniqueVindex(ctx, innerExpr) { canPushDown = false @@ -444,11 +479,13 @@ func splitAggrColumnsToLeftAndRight( outerJoin: join.LeftJoin, } - canPushDownDistinct, distinctExpr, err := checkIfWeCanPushDown(ctx, aggregator) + canPushDown, distinctExpr, err := checkIfWeCanPushDown(ctx, aggregator) if err != nil { return nil, nil, err } - if !canPushDownDistinct { + + // + if !canPushDown { aggregator.DistinctExpr = distinctExpr return nil, nil, errAbortAggrPushing } diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index 0dbbe645080..97f1b5c96ac 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -904,9 +904,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col1, col2, weight_string(col1), weight_string(col2) from `user` where 1 != 1 group by col1, col2, weight_string(col1), weight_string(col2)", - "OrderBy": "(0|2) ASC, (1|3) ASC", - "Query": "select col1, col2, weight_string(col1), weight_string(col2) from `user` group by col1, col2, weight_string(col1), weight_string(col2) order by col1 asc, col2 asc", + "FieldQuery": "select col1, min(distinct col2), weight_string(col1), weight_string(col2) from `user` where 1 != 1 group by col1, weight_string(col1), weight_string(col2)", + "OrderBy": "(0|2) ASC", + "Query": "select col1, min(distinct col2), weight_string(col1), weight_string(col2) from `user` group by col1, weight_string(col1), weight_string(col2) order by col1 asc", "Table": "`user`" } ] From 9323dcb0bd71be7116def5b3c52ba47639c0d977 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 12 Jul 2023 14:19:38 +0530 Subject: [PATCH 6/7] rewrite min/max distinct and remove distinct, making the planning simpler Signed-off-by: Harshit Gangal --- go/vt/sqlparser/ast.go | 8 +++ go/vt/sqlparser/ast_rewriting.go | 14 +++++ .../operators/aggregation_pushing.go | 57 +++++-------------- .../planbuilder/testdata/aggr_cases.json | 49 +++++++++++++--- 4 files changed, 76 insertions(+), 52 deletions(-) diff --git a/go/vt/sqlparser/ast.go b/go/vt/sqlparser/ast.go index abc72f3f05b..b6959c983d6 100644 --- a/go/vt/sqlparser/ast.go +++ b/go/vt/sqlparser/ast.go @@ -2859,6 +2859,7 @@ type ( DistinctableAggr interface { IsDistinct() bool + SetDistinct(bool) } Count struct { @@ -3371,6 +3372,13 @@ func (avg *Avg) IsDistinct() bool { return avg.Distinct } func (count *Count) IsDistinct() bool { return count.Distinct } func (grpConcat *GroupConcatExpr) IsDistinct() bool { return grpConcat.Distinct } +func (sum *Sum) SetDistinct(distinct bool) { sum.Distinct = distinct } +func (min *Min) SetDistinct(distinct bool) { min.Distinct = distinct } +func (max *Max) SetDistinct(distinct bool) { max.Distinct = distinct } +func (avg *Avg) SetDistinct(distinct bool) { avg.Distinct = distinct } +func (count *Count) SetDistinct(distinct bool) { count.Distinct = distinct } +func (grpConcat *GroupConcatExpr) SetDistinct(distinct bool) { grpConcat.Distinct = distinct } + func (*Sum) AggrName() string { return "sum" } func (*Min) AggrName() string { return "min" } func (*Max) AggrName() string { return "max" } diff --git a/go/vt/sqlparser/ast_rewriting.go b/go/vt/sqlparser/ast_rewriting.go index 00fa7c8b740..3bf4fc191a9 100644 --- a/go/vt/sqlparser/ast_rewriting.go +++ b/go/vt/sqlparser/ast_rewriting.go @@ -364,6 +364,8 @@ func (er *astRewriter) rewriteUp(cursor *Cursor) bool { er.rewriteShowBasic(node) case *ExistsExpr: er.existsRewrite(cursor, node) + case DistinctableAggr: + er.rewriteDistinctableAggr(cursor, node) } return true } @@ -683,6 +685,18 @@ func (er *astRewriter) existsRewrite(cursor *Cursor, node *ExistsExpr) { sel.GroupBy = nil } +// rewriteDistinctableAggr removed Distinct from Max and Min Aggregations as it does not impact the result. But, makes the plan simpler. +func (er *astRewriter) rewriteDistinctableAggr(cursor *Cursor, node DistinctableAggr) { + if !node.IsDistinct() { + return + } + switch aggr := node.(type) { + case *Max, *Min: + aggr.SetDistinct(false) + er.bindVars.NoteRewrite() + } +} + func bindVarExpression(name string) Expr { return NewArgument(name) } diff --git a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go index dea4d39f667..6fb5cc88be4 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go +++ b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go @@ -115,42 +115,15 @@ func pushDownAggregationThroughRoute( // pushDownAggregations splits aggregations between the original aggregator and the one we are pushing down func pushDownAggregations(ctx *plancontext.PlanningContext, aggregator *Aggregator, aggrBelowRoute *Aggregator) error { - var distinctExpr sqlparser.Expr - var differentExpr *sqlparser.AliasedExpr + canPushDownDistinctAggr, distinctExpr, err := checkIfWeCanPushDown(ctx, aggregator) + if err != nil { + return err + } - distinctAggrPushedDown := true distinctAggrGroupByAdded := false -outer: for i, aggr := range aggregator.Aggregations { - if !aggr.Distinct { - aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggr) - aggregateTheAggregate(aggregator, i) - continue - } - funcExpr := aggr.Func.GetArg() - switch aggr.OpCode { - case opcode.AggregateCountDistinct, opcode.AggregateSumDistinct: - if !exprHasUniqueVindex(ctx, funcExpr) { - distinctAggrPushedDown = false - } - if distinctExpr == nil { - distinctExpr = funcExpr - break - } - if !ctx.SemTable.EqualsExpr(distinctExpr, funcExpr) { - differentExpr = aggr.Original - } - if differentExpr != nil && !distinctAggrPushedDown { - return vterrors.VT12001(fmt.Sprintf("only one DISTINCT aggregation is allowed in a SELECT: %s", sqlparser.String(differentExpr))) - } - default: - aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggr) - aggregateTheAggregate(aggregator, i) - continue outer - } - - if distinctAggrPushedDown { + if !aggr.Distinct || canPushDownDistinctAggr { aggrBelowRoute.Aggregations = append(aggrBelowRoute.Aggregations, aggr) aggregateTheAggregate(aggregator, i) continue @@ -158,21 +131,21 @@ outer: // We handle a distinct aggregation by turning it into a group by and // doing the aggregating on the vtgate level instead - aeDistinctExpr := aeWrap(funcExpr) + aeDistinctExpr := aeWrap(distinctExpr) aggrBelowRoute.Columns[aggr.ColOffset] = aeDistinctExpr // We handle a distinct aggregation by turning it into a group by and // doing the aggregating on the vtgate level instead // Adding to group by can be done only once even though there are multiple distinct aggregation with same expression. if !distinctAggrGroupByAdded { - groupBy := NewGroupBy(funcExpr, funcExpr, aeDistinctExpr) + groupBy := NewGroupBy(distinctExpr, distinctExpr, aeDistinctExpr) groupBy.ColOffset = aggr.ColOffset aggrBelowRoute.Grouping = append(aggrBelowRoute.Grouping, groupBy) distinctAggrGroupByAdded = true } } - if !distinctAggrPushedDown { + if !canPushDownDistinctAggr { aggregator.DistinctExpr = distinctExpr } @@ -189,12 +162,6 @@ func checkIfWeCanPushDown(ctx *plancontext.PlanningContext, aggregator *Aggregat continue } - // MIN and MAX function can also have distinct expression in the function argument. - // So, they need to be skipped. - if aggr.OpCode != opcode.AggregateCountDistinct && aggr.OpCode != opcode.AggregateSumDistinct { - continue - } - innerExpr := aggr.Func.GetArg() if !exprHasUniqueVindex(ctx, innerExpr) { canPushDown = false @@ -479,13 +446,14 @@ func splitAggrColumnsToLeftAndRight( outerJoin: join.LeftJoin, } - canPushDown, distinctExpr, err := checkIfWeCanPushDown(ctx, aggregator) + canPushDownDistinctAggr, distinctExpr, err := checkIfWeCanPushDown(ctx, aggregator) if err != nil { return nil, nil, err } - // - if !canPushDown { + // Distinct aggregation cannot be pushed down in the join. + // We keep node of the distinct aggregation expression to be used later for ordering. + if !canPushDownDistinctAggr { aggregator.DistinctExpr = distinctExpr return nil, nil, errAbortAggrPushing } @@ -588,6 +556,7 @@ func (ab *aggBuilder) handleAggr(ctx *plancontext.PlanningContext, aggr Aggr) er // this is only used for SHOW GTID queries that will never contain joins return vterrors.VT13001("cannot do join with vgtid") case opcode.AggregateSumDistinct, opcode.AggregateCountDistinct: + // we are not going to see values multiple times, so we don't need to multiply with the count(*) from the other side return ab.handlePushThroughAggregation(ctx, aggr) default: return errHorizonNotPlanned() diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index 97f1b5c96ac..07f1316c995 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -878,9 +878,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col1, min(distinct col2), weight_string(col1) from `user` where 1 != 1 group by col1, weight_string(col1)", + "FieldQuery": "select col1, min(col2) as `min(distinct col2)`, weight_string(col1) from `user` where 1 != 1 group by col1, weight_string(col1)", "OrderBy": "(0|2) ASC", - "Query": "select col1, min(distinct col2), weight_string(col1) from `user` group by col1, weight_string(col1) order by col1 asc", + "Query": "select col1, min(col2) as `min(distinct col2)`, weight_string(col1) from `user` group by col1, weight_string(col1) order by col1 asc", "ResultColumns": 2, "Table": "`user`" } @@ -904,9 +904,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col1, min(distinct col2), weight_string(col1), weight_string(col2) from `user` where 1 != 1 group by col1, weight_string(col1), weight_string(col2)", + "FieldQuery": "select col1, min(col2) as `min(distinct col2)`, weight_string(col1), weight_string(col2) from `user` where 1 != 1 group by col1, weight_string(col1), weight_string(col2)", "OrderBy": "(0|2) ASC", - "Query": "select col1, min(distinct col2), weight_string(col1), weight_string(col2) from `user` group by col1, weight_string(col1), weight_string(col2) order by col1 asc", + "Query": "select col1, min(col2) as `min(distinct col2)`, weight_string(col1), weight_string(col2) from `user` group by col1, weight_string(col1), weight_string(col2) order by col1 asc", "Table": "`user`" } ] @@ -3537,9 +3537,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col1, min(distinct id), col3, weight_string(col1), weight_string(col3) from `user` where 1 != 1 group by col1, col3, weight_string(col1), weight_string(col3)", + "FieldQuery": "select col1, min(id) as `min(distinct id)`, col3, weight_string(col1), weight_string(col3) from `user` where 1 != 1 group by col1, col3, weight_string(col1), weight_string(col3)", "OrderBy": "(0|3) ASC, (2|4) ASC", - "Query": "select col1, min(distinct id), col3, weight_string(col1), weight_string(col3) from `user` group by col1, col3, weight_string(col1), weight_string(col3) order by col1 asc, col3 asc", + "Query": "select col1, min(id) as `min(distinct id)`, col3, weight_string(col1), weight_string(col3) from `user` group by col1, col3, weight_string(col1), weight_string(col3) order by col1 asc, col3 asc", "ResultColumns": 3, "Table": "`user`" } @@ -3563,9 +3563,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select col1, min(distinct id), col3, weight_string(col1), weight_string(id), weight_string(col3) from `user` where 1 != 1 group by col1, col3, weight_string(col1), weight_string(id), weight_string(col3)", + "FieldQuery": "select col1, min(id) as `min(distinct id)`, col3, weight_string(col1), weight_string(id), weight_string(col3) from `user` where 1 != 1 group by col1, col3, weight_string(col1), weight_string(id), weight_string(col3)", "OrderBy": "(0|3) ASC, (2|5) ASC", - "Query": "select col1, min(distinct id), col3, weight_string(col1), weight_string(id), weight_string(col3) from `user` group by col1, col3, weight_string(col1), weight_string(id), weight_string(col3) order by col1 asc, col3 asc", + "Query": "select col1, min(id) as `min(distinct id)`, col3, weight_string(col1), weight_string(id), weight_string(col3) from `user` group by col1, col3, weight_string(col1), weight_string(id), weight_string(col3) order by col1 asc, col3 asc", "Table": "`user`" } ] @@ -7097,5 +7097,38 @@ "user.user" ] } + }, + { + "comment": "count and sum distinct with min distinct on different expressions", + "query": "select foo, min(distinct bar), count(distinct baz), sum(distinct baz) from user group by foo", + "v3-plan": "VT12001: unsupported: only one DISTINCT aggregation allowed in a SELECT: sum(distinct baz)", + "gen4-plan": { + "QueryType": "SELECT", + "Original": "select foo, min(distinct bar), count(distinct baz), sum(distinct baz) from user group by foo", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "min(1|5) AS min(distinct bar), count_distinct(2|6) AS count(distinct baz), sum_distinct(3|6) AS sum(distinct baz)", + "GroupBy": "(0|4)", + "ResultColumns": 4, + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select foo, min(bar) as `min(distinct bar)`, baz, baz, weight_string(foo), weight_string(bar), weight_string(baz) from `user` where 1 != 1 group by foo, baz, weight_string(foo), weight_string(bar), weight_string(baz)", + "OrderBy": "(0|4) ASC, (2|6) ASC", + "Query": "select foo, min(bar) as `min(distinct bar)`, baz, baz, weight_string(foo), weight_string(bar), weight_string(baz) from `user` group by foo, baz, weight_string(foo), weight_string(bar), weight_string(baz) order by foo asc, baz asc", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } } ] From 4367267e171c8b726ac16d2b834385d66076f59a Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 12 Jul 2023 14:31:54 +0530 Subject: [PATCH 7/7] added unit test Signed-off-by: Harshit Gangal --- go/vt/sqlparser/ast_rewriting_test.go | 3 +++ .../vtgate/planbuilder/testdata/aggr_cases.json | 16 ++++++++-------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/go/vt/sqlparser/ast_rewriting_test.go b/go/vt/sqlparser/ast_rewriting_test.go index 6fe59acbc85..2f34f21f74d 100644 --- a/go/vt/sqlparser/ast_rewriting_test.go +++ b/go/vt/sqlparser/ast_rewriting_test.go @@ -289,6 +289,9 @@ func TestRewrites(in *testing.T) { }, { in: "SELECT id, name, salary FROM user_details", expected: "SELECT id, name, salary FROM (select user.id, user.name, user_extra.salary from user join user_extra where user.id = user_extra.user_id) as user_details", + }, { + in: "select max(distinct c1), min(distinct c2), avg(distinct c3), sum(distinct c4), count(distinct c5), group_concat(distinct c6) from tbl", + expected: "select max(c1) as `max(distinct c1)`, min(c2) as `min(distinct c2)`, avg(distinct c3), sum(distinct c4), count(distinct c5), group_concat(distinct c6) from tbl", }, { in: "SHOW VARIABLES", expected: "SHOW VARIABLES", diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index 07f1316c995..f5501e9f06b 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -7100,17 +7100,17 @@ }, { "comment": "count and sum distinct with min distinct on different expressions", - "query": "select foo, min(distinct bar), count(distinct baz), sum(distinct baz) from user group by foo", + "query": "select foo, min(distinct bar), count(distinct baz), sum(distinct baz), max(distinct toto) from user group by foo", "v3-plan": "VT12001: unsupported: only one DISTINCT aggregation allowed in a SELECT: sum(distinct baz)", "gen4-plan": { "QueryType": "SELECT", - "Original": "select foo, min(distinct bar), count(distinct baz), sum(distinct baz) from user group by foo", + "Original": "select foo, min(distinct bar), count(distinct baz), sum(distinct baz), max(distinct toto) from user group by foo", "Instructions": { "OperatorType": "Aggregate", "Variant": "Ordered", - "Aggregates": "min(1|5) AS min(distinct bar), count_distinct(2|6) AS count(distinct baz), sum_distinct(3|6) AS sum(distinct baz)", - "GroupBy": "(0|4)", - "ResultColumns": 4, + "Aggregates": "min(1|6) AS min(distinct bar), count_distinct(2|7) AS count(distinct baz), sum_distinct(3|7) AS sum(distinct baz), max(4|8) AS max(distinct toto)", + "GroupBy": "(0|5)", + "ResultColumns": 5, "Inputs": [ { "OperatorType": "Route", @@ -7119,9 +7119,9 @@ "Name": "user", "Sharded": true }, - "FieldQuery": "select foo, min(bar) as `min(distinct bar)`, baz, baz, weight_string(foo), weight_string(bar), weight_string(baz) from `user` where 1 != 1 group by foo, baz, weight_string(foo), weight_string(bar), weight_string(baz)", - "OrderBy": "(0|4) ASC, (2|6) ASC", - "Query": "select foo, min(bar) as `min(distinct bar)`, baz, baz, weight_string(foo), weight_string(bar), weight_string(baz) from `user` group by foo, baz, weight_string(foo), weight_string(bar), weight_string(baz) order by foo asc, baz asc", + "FieldQuery": "select foo, min(bar) as `min(distinct bar)`, baz, baz, max(toto) as `max(distinct toto)`, weight_string(foo), weight_string(bar), weight_string(baz), weight_string(toto) from `user` where 1 != 1 group by foo, baz, weight_string(foo), weight_string(bar), weight_string(baz), weight_string(toto)", + "OrderBy": "(0|5) ASC, (2|7) ASC", + "Query": "select foo, min(bar) as `min(distinct bar)`, baz, baz, max(toto) as `max(distinct toto)`, weight_string(foo), weight_string(bar), weight_string(baz), weight_string(toto) from `user` group by foo, baz, weight_string(foo), weight_string(bar), weight_string(baz), weight_string(toto) order by foo asc, baz asc", "Table": "`user`" } ]