Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func TestAggrWithLimit(t *testing.T) {
mcmp.Exec(fmt.Sprintf("insert into aggr_test(id, val1, val2) values(%d, 'a', %d)", i, r))
}
mcmp.Exec("select val2, count(*) from aggr_test group by val2 order by count(*), val2 limit 10")
if utils.BinaryIsAtLeastAtVersion(21, "vtgate") {
mcmp.Exec("SELECT 1 AS `id`, COUNT(*) FROM (SELECT `id` FROM aggr_test WHERE val1 = 1 LIMIT 100) `t`")
}
}

func TestAggregateTypes(t *testing.T) {
Expand Down
15 changes: 15 additions & 0 deletions go/vt/sqlparser/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package sqlparser
// analyzer.go contains utility analysis functions.

import (
"errors"
"fmt"
"strings"
"unicode"
Expand Down Expand Up @@ -383,6 +384,20 @@ func IsColName(node Expr) bool {
return ok
}

var errNotStatic = errors.New("not static")

// IsConstant returns true if the Expr can be evaluated without input or access to tables.
func IsConstant(node Expr) bool {
err := Walk(func(node SQLNode) (kontinue bool, err error) {
switch node.(type) {
case *ColName, *Subquery:
return false, errNotStatic
}
return true, nil
}, node)
return err == nil
}

// IsValue returns true if the Expr is a string, integral or value arg.
// NULL is not considered to be a value.
func IsValue(node Expr) bool {
Expand Down
154 changes: 108 additions & 46 deletions go/vt/vtgate/engine/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ import (
// It contains the opcode and input column number.
type AggregateParams struct {
Opcode AggregateOpcode
Col int
// Input source specification - exactly one of these should be set:
// Col: Column index for simple column references (e.g., SUM(column_name))
// EExpr: Evaluated expression for literals, parameters
Col int
EExpr evalengine.Expr

// These are used only for distinct opcodes.
KeyCol int
Expand All @@ -53,15 +57,26 @@ type AggregateParams struct {
CollationEnv *collations.Environment
}

func NewAggregateParam(opcode AggregateOpcode, col int, alias string, collationEnv *collations.Environment) *AggregateParams {
// NewAggregateParam creates a new aggregate param
func NewAggregateParam(
oc AggregateOpcode,
col int,
expr evalengine.Expr,
alias string,
collationEnv *collations.Environment,
) *AggregateParams {
if expr != nil && oc != AggregateConstant {
panic(vterrors.VT13001("expr should be nil"))
}
out := &AggregateParams{
Opcode: opcode,
Opcode: oc,
Col: col,
EExpr: expr,
Alias: alias,
WCol: -1,
CollationEnv: collationEnv,
}
if opcode.NeedsComparableValues() {
if oc.NeedsComparableValues() {
out.KeyCol = col
}
return out
Expand All @@ -73,6 +88,9 @@ func (ap *AggregateParams) WAssigned() bool {

func (ap *AggregateParams) String() string {
keyCol := strconv.Itoa(ap.Col)
if ap.EExpr != nil {
keyCol = sqlparser.String(ap.EExpr)
}
if ap.WAssigned() {
keyCol = fmt.Sprintf("%s|%d", keyCol, ap.WCol)
}
Expand All @@ -89,7 +107,14 @@ func (ap *AggregateParams) String() string {
return fmt.Sprintf("%s%s(%s)", ap.Opcode.String(), dispOrigOp, keyCol)
}

func (ap *AggregateParams) typ(inputType querypb.Type) querypb.Type {
func (ap *AggregateParams) typ(inputType querypb.Type, env *evalengine.ExpressionEnv, collID collations.ID) querypb.Type {
if ap.EExpr != nil {
value, err := eval(env, ap.EExpr, collID)
if err != nil {
return sqltypes.Unknown
}
return value.Type()
}
if ap.OrigOpcode != AggregateUnassigned {
return ap.OrigOpcode.SQLType(inputType)
}
Expand All @@ -98,7 +123,7 @@ func (ap *AggregateParams) typ(inputType querypb.Type) querypb.Type {

type aggregator interface {
add(row []sqltypes.Value) error
finish() sqltypes.Value
finish(env *evalengine.ExpressionEnv, coll collations.ID) (sqltypes.Value, error)
reset()
}

Expand Down Expand Up @@ -151,8 +176,8 @@ func (a *aggregatorCount) add(row []sqltypes.Value) error {
return nil
}

func (a *aggregatorCount) finish() sqltypes.Value {
return sqltypes.NewInt64(a.n)
func (a *aggregatorCount) finish(*evalengine.ExpressionEnv, collations.ID) (sqltypes.Value, error) {
return sqltypes.NewInt64(a.n), nil
}

func (a *aggregatorCount) reset() {
Expand All @@ -164,13 +189,13 @@ type aggregatorCountStar struct {
n int64
}

func (a *aggregatorCountStar) add(_ []sqltypes.Value) error {
func (a *aggregatorCountStar) add([]sqltypes.Value) error {
a.n++
return nil
}

func (a *aggregatorCountStar) finish() sqltypes.Value {
return sqltypes.NewInt64(a.n)
func (a *aggregatorCountStar) finish(*evalengine.ExpressionEnv, collations.ID) (sqltypes.Value, error) {
return sqltypes.NewInt64(a.n), nil
}

func (a *aggregatorCountStar) reset() {
Expand Down Expand Up @@ -198,8 +223,8 @@ func (a *aggregatorMax) add(row []sqltypes.Value) (err error) {
return a.minmax.Max(row[a.from])
}

func (a *aggregatorMinMax) finish() sqltypes.Value {
return a.minmax.Result()
func (a *aggregatorMinMax) finish(*evalengine.ExpressionEnv, collations.ID) (sqltypes.Value, error) {
return a.minmax.Result(), nil
}

func (a *aggregatorMinMax) reset() {
Expand All @@ -222,8 +247,8 @@ func (a *aggregatorSum) add(row []sqltypes.Value) error {
return a.sum.Add(row[a.from])
}

func (a *aggregatorSum) finish() sqltypes.Value {
return a.sum.Result()
func (a *aggregatorSum) finish(*evalengine.ExpressionEnv, collations.ID) (sqltypes.Value, error) {
return a.sum.Result(), nil
}

func (a *aggregatorSum) reset() {
Expand All @@ -232,28 +257,51 @@ func (a *aggregatorSum) reset() {
}

type aggregatorScalar struct {
from int
current sqltypes.Value
init bool
from int
current sqltypes.Value
hasValue bool
}

func (a *aggregatorScalar) add(row []sqltypes.Value) error {
if !a.init {
if !a.hasValue {
a.current = row[a.from]
a.init = true
a.hasValue = true
}
return nil
}

func (a *aggregatorScalar) finish() sqltypes.Value {
return a.current
func (a *aggregatorScalar) finish(*evalengine.ExpressionEnv, collations.ID) (sqltypes.Value, error) {
return a.current, nil
}

func (a *aggregatorScalar) reset() {
a.current = sqltypes.NULL
a.init = false
a.hasValue = false
}

type aggregatorConstant struct {
expr evalengine.Expr
}

func (*aggregatorConstant) add([]sqltypes.Value) error {
return nil
}

func (a *aggregatorConstant) finish(env *evalengine.ExpressionEnv, coll collations.ID) (sqltypes.Value, error) {
return eval(env, a.expr, coll)
}

func eval(env *evalengine.ExpressionEnv, eexpr evalengine.Expr, coll collations.ID) (sqltypes.Value, error) {
v, err := env.Evaluate(eexpr)
if err != nil {
return sqltypes.Value{}, err
}

return v.Value(coll), nil
}

func (*aggregatorConstant) reset() {}

type aggregatorGroupConcat struct {
from int
type_ sqltypes.Type
Expand All @@ -275,11 +323,11 @@ func (a *aggregatorGroupConcat) add(row []sqltypes.Value) error {
return nil
}

func (a *aggregatorGroupConcat) finish() sqltypes.Value {
func (a *aggregatorGroupConcat) finish(*evalengine.ExpressionEnv, collations.ID) (sqltypes.Value, error) {
if a.n == 0 {
return sqltypes.NULL
return sqltypes.NULL, nil
}
return sqltypes.MakeTrusted(a.type_, a.concat)
return sqltypes.MakeTrusted(a.type_, a.concat), nil
}

func (a *aggregatorGroupConcat) reset() {
Expand All @@ -301,36 +349,44 @@ func (a *aggregatorGtid) add(row []sqltypes.Value) error {
return nil
}

func (a *aggregatorGtid) finish() sqltypes.Value {
func (a *aggregatorGtid) finish(*evalengine.ExpressionEnv, collations.ID) (sqltypes.Value, error) {
gtid := binlogdatapb.VGtid{ShardGtids: a.shards}
return sqltypes.NewVarChar(gtid.String())
return sqltypes.NewVarChar(gtid.String()), nil
}

func (a *aggregatorGtid) reset() {
a.shards = a.shards[:0] // safe to reuse because only the serialized form of a.shards is returned
}

type aggregationState []aggregator
type aggregationState struct {
env *evalengine.ExpressionEnv
aggregators []aggregator
coll collations.ID
}

func (a aggregationState) add(row []sqltypes.Value) error {
for _, st := range a {
func (a *aggregationState) add(row []sqltypes.Value) error {
for _, st := range a.aggregators {
if err := st.add(row); err != nil {
return err
}
}
return nil
}

func (a aggregationState) finish() (row []sqltypes.Value) {
row = make([]sqltypes.Value, 0, len(a))
for _, st := range a {
row = append(row, st.finish())
func (a *aggregationState) finish() ([]sqltypes.Value, error) {
row := make([]sqltypes.Value, 0, len(a.aggregators))
for _, st := range a.aggregators {
v, err := st.finish(a.env, a.coll)
if err != nil {
return nil, err
}
row = append(row, v)
}
return
return row, nil
}

func (a aggregationState) reset() {
for _, st := range a {
func (a *aggregationState) reset() {
for _, st := range a.aggregators {
st.reset()
}
}
Expand All @@ -354,13 +410,16 @@ func isComparable(typ sqltypes.Type) bool {
return false
}

func newAggregation(fields []*querypb.Field, aggregates []*AggregateParams) (aggregationState, []*querypb.Field, error) {
func newAggregation(fields []*querypb.Field, aggregates []*AggregateParams, env *evalengine.ExpressionEnv, collation collations.ID) (*aggregationState, []*querypb.Field, error) {
fields = slice.Map(fields, func(from *querypb.Field) *querypb.Field { return from.CloneVT() })

agstate := make([]aggregator, len(fields))
aggregators := make([]aggregator, len(fields))
for _, aggr := range aggregates {
sourceType := fields[aggr.Col].Type
targetType := aggr.typ(sourceType)
var sourceType querypb.Type
if aggr.Col < len(fields) {
sourceType = fields[aggr.Col].Type
}
targetType := aggr.typ(sourceType, env, collation)

var ag aggregator
var distinct = -1
Expand Down Expand Up @@ -444,22 +503,25 @@ func newAggregation(fields []*querypb.Field, aggregates []*AggregateParams) (agg
separator: separator,
}

case AggregateConstant:
ag = &aggregatorConstant{expr: aggr.EExpr}

default:
panic("BUG: unexpected Aggregation opcode")
}

agstate[aggr.Col] = ag
aggregators[aggr.Col] = ag
fields[aggr.Col].Type = targetType
if aggr.Alias != "" {
fields[aggr.Col].Name = aggr.Alias
}
}

for i, a := range agstate {
for i, a := range aggregators {
if a == nil {
agstate[i] = &aggregatorScalar{from: i}
aggregators[i] = &aggregatorScalar{from: i}
}
}

return agstate, fields, nil
return &aggregationState{aggregators: aggregators, env: env, coll: collation}, fields, nil
}
6 changes: 5 additions & 1 deletion go/vt/vtgate/engine/cached_size.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading