Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions data/test/vtgate/aggr_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,49 @@
}
}

# scatter group by a text column
"select count(*), a, textcol1, b from user group by a, textcol1, b"
{
"Original": "select count(*), a, textcol1, b from user group by a, textcol1, b",
"Instructions": {
"Aggregates": [
{
"Opcode": "count",
"Col": 0
}
],
"Keys": [
1,
4,
3
],
"TruncateColumnCount": 4,
"Input": {
"Opcode": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "select count(*), a, textcol1, b, weight_string(textcol1) from user group by a, textcol1, b order by a asc, textcol1 asc, b asc",
"FieldQuery": "select count(*), a, textcol1, b, weight_string(textcol1) from user where 1 != 1 group by a, textcol1, b",
"OrderBy": [
{
"Col": 1,
"Desc": false
},
{
"Col": 4,
"Desc": false
},
{
"Col": 3,
"Desc": false
}
]
}
}
}

# count aggregate
"select count(*) from user"
{
Expand Down
94 changes: 94 additions & 0 deletions data/test/vtgate/postprocess_cases.txt
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,100 @@
}
}

# ORDER BY on scatter with text column
"select a, textcol1, b from user order by a, textcol1, b"
{
"Original": "select a, textcol1, b from user order by a, textcol1, b",
"Instructions": {
"Opcode": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "select a, textcol1, b, weight_string(textcol1) from user order by a asc, textcol1 asc, b asc",
"FieldQuery": "select a, textcol1, b, weight_string(textcol1) from user where 1 != 1",
"OrderBy": [
{
"Col": 0,
"Desc": false
},
{
"Col": 3,
"Desc": false
},
{
"Col": 2,
"Desc": false
}
],
"TruncateColumnCount": 3
}
}

# ORDER BY on scatter with text column, qualified name
"select a, user.textcol1, b from user order by a, textcol1, b"
{
"Original": "select a, user.textcol1, b from user order by a, textcol1, b",
"Instructions": {
"Opcode": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "select a, user.textcol1, b, weight_string(user.textcol1) from user order by a asc, textcol1 asc, b asc",
"FieldQuery": "select a, user.textcol1, b, weight_string(user.textcol1) from user where 1 != 1",
"OrderBy": [
{
"Col": 0,
"Desc": false
},
{
"Col": 3,
"Desc": false
},
{
"Col": 2,
"Desc": false
}
],
"TruncateColumnCount": 3
}
}

# ORDER BY on scatter with multiple text columns
"select a, textcol1, b, textcol2 from user order by a, textcol1, b, textcol2"
{
"Original": "select a, textcol1, b, textcol2 from user order by a, textcol1, b, textcol2",
"Instructions": {
"Opcode": "SelectScatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"Query": "select a, textcol1, b, textcol2, weight_string(textcol1), weight_string(textcol2) from user order by a asc, textcol1 asc, b asc, textcol2 asc",
"FieldQuery": "select a, textcol1, b, textcol2, weight_string(textcol1), weight_string(textcol2) from user where 1 != 1",
"OrderBy": [
{
"Col": 0,
"Desc": false
},
{
"Col": 4,
"Desc": false
},
{
"Col": 2,
"Desc": false
},
{
"Col": 5,
"Desc": false
}
],
"TruncateColumnCount": 4
}
}

# ORDER BY invalid col number on scatter
"select col from user order by 2"
"column number out of range: 2"
Expand Down
8 changes: 8 additions & 0 deletions data/test/vtgate/schema_test.json
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@
},
{
"name": "predef2"
},
{
"name": "textcol1",
"type": "VARCHAR"
},
{
"name": "textcol2",
"type": "VARCHAR"
}
]
},
Expand Down
35 changes: 35 additions & 0 deletions go/sqltypes/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,41 @@ func CopyRow(r []Value) []Value {
return out
}

// Truncate returns a new Result with all the rows truncated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you rephrase this as "truncated to the specified number of columns".
I was initially confused thinking that this would truncate the individual fields.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// to the specified number of columns.
func (result *Result) Truncate(l int) *Result {
if l == 0 {
return result
}

out := &Result{
InsertID: result.InsertID,
RowsAffected: result.RowsAffected,
}
if result.Fields != nil {
out.Fields = result.Fields[:l]
}
if result.Rows != nil {
out.Rows = make([][]Value, 0, len(result.Rows))
for _, r := range result.Rows {
out.Rows = append(out.Rows, r[:l])
}
}
if result.Extras != nil {
out.Extras = &querypb.ResultExtras{
Fresher: result.Extras.Fresher,
}
if result.Extras.EventToken != nil {
out.Extras.EventToken = &querypb.EventToken{
Timestamp: result.Extras.EventToken.Timestamp,
Shard: result.Extras.EventToken.Shard,
Position: result.Extras.EventToken.Position,
}
}
}
return out
}

// FieldsEqual compares two arrays of fields.
// reflect.DeepEqual shouldn't be used because of the protos.
func FieldsEqual(f1, f2 []*querypb.Field) bool {
Expand Down
55 changes: 55 additions & 0 deletions go/sqltypes/result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,61 @@ func TestCopy(t *testing.T) {
}
}

func TestTruncate(t *testing.T) {
in := &Result{
Fields: []*querypb.Field{{
Type: Int64,
}, {
Type: VarChar,
}},
InsertID: 1,
RowsAffected: 2,
Rows: [][]Value{
{TestValue(Int64, "1"), MakeTrusted(Null, nil)},
{TestValue(Int64, "2"), MakeTrusted(VarChar, nil)},
{TestValue(Int64, "3"), TestValue(VarChar, "")},
},
Extras: &querypb.ResultExtras{
EventToken: &querypb.EventToken{
Timestamp: 123,
Shard: "sh",
Position: "po",
},
Fresher: true,
},
}

out := in.Truncate(0)
if !reflect.DeepEqual(out, in) {
t.Errorf("Truncate(0):\n%v, want\n%v", out, in)
}

out = in.Truncate(1)
want := &Result{
Fields: []*querypb.Field{{
Type: Int64,
}},
InsertID: 1,
RowsAffected: 2,
Rows: [][]Value{
{TestValue(Int64, "1")},
{TestValue(Int64, "2")},
{TestValue(Int64, "3")},
},
Extras: &querypb.ResultExtras{
EventToken: &querypb.EventToken{
Timestamp: 123,
Shard: "sh",
Position: "po",
},
Fresher: true,
},
}
if !reflect.DeepEqual(out, want) {
t.Errorf("Truncate(1):\n%v, want\n%v", out, want)
}
}

func TestStripMetaData(t *testing.T) {
testcases := []struct {
name string
Expand Down
31 changes: 27 additions & 4 deletions go/vt/vtgate/engine/ordered_aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

"github.com/youtube/vitess/go/sqltypes"

querypb "github.com/youtube/vitess/go/vt/proto/query"
)

Expand All @@ -39,6 +40,11 @@ type OrderedAggregate struct {
// the aggregation key.
Keys []int

// TruncateColumnCount specifies the number of columns to return
// in the final result. Rest of the columns are truncated
// from the result received. If 0, no truncation happens.
TruncateColumnCount int `json:",omitempty"`

// Input is the primitive that will feed into this Primitive.
Input Primitive
}
Expand Down Expand Up @@ -87,6 +93,14 @@ func (code AggregateOpcode) MarshalJSON() ([]byte, error) {

// Execute is a Primitive function.
func (oa *OrderedAggregate) Execute(vcursor VCursor, bindVars, joinVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
qr, err := oa.execute(vcursor, bindVars, joinVars, wantfields)
if err != nil {
return nil, err
}
return qr.Truncate(oa.TruncateColumnCount), nil
}

func (oa *OrderedAggregate) execute(vcursor VCursor, bindVars, joinVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
result, err := oa.Input.Execute(vcursor, bindVars, joinVars, wantfields)
if err != nil {
return nil, err
Expand Down Expand Up @@ -130,10 +144,15 @@ func (oa *OrderedAggregate) Execute(vcursor VCursor, bindVars, joinVars map[stri
func (oa *OrderedAggregate) StreamExecute(vcursor VCursor, bindVars, joinVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
var current []sqltypes.Value
var fields []*querypb.Field

cb := func(qr *sqltypes.Result) error {
return callback(qr.Truncate(oa.TruncateColumnCount))
}

err := oa.Input.StreamExecute(vcursor, bindVars, joinVars, wantfields, func(qr *sqltypes.Result) error {
if len(qr.Fields) != 0 {
fields = qr.Fields
if err := callback(&sqltypes.Result{Fields: qr.Fields}); err != nil {
if err := cb(&sqltypes.Result{Fields: fields}); err != nil {
return err
}
}
Expand All @@ -156,7 +175,7 @@ func (oa *OrderedAggregate) StreamExecute(vcursor VCursor, bindVars, joinVars ma
}
continue
}
if err := callback(&sqltypes.Result{Rows: [][]sqltypes.Value{current}}); err != nil {
if err := cb(&sqltypes.Result{Rows: [][]sqltypes.Value{current}}); err != nil {
return err
}
current = row
Expand All @@ -168,7 +187,7 @@ func (oa *OrderedAggregate) StreamExecute(vcursor VCursor, bindVars, joinVars ma
}

if current != nil {
if err := callback(&sqltypes.Result{Rows: [][]sqltypes.Value{current}}); err != nil {
if err := cb(&sqltypes.Result{Rows: [][]sqltypes.Value{current}}); err != nil {
return err
}
}
Expand All @@ -177,7 +196,11 @@ func (oa *OrderedAggregate) StreamExecute(vcursor VCursor, bindVars, joinVars ma

// GetFields is a Primitive function.
func (oa *OrderedAggregate) GetFields(vcursor VCursor, bindVars, joinVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return oa.Input.GetFields(vcursor, bindVars, joinVars)
qr, err := oa.Input.GetFields(vcursor, bindVars, joinVars)
if err != nil {
return nil, err
}
return qr.Truncate(oa.TruncateColumnCount), nil
}

func (oa *OrderedAggregate) keysEqual(row1, row2 []sqltypes.Value) (bool, error) {
Expand Down
Loading