Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
)

Expand Down Expand Up @@ -56,6 +57,16 @@ func (c *Concatenate) GetTableName() string {
return res
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (c *Concatenate) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
for _, src := range c.Sources {
if err := src.GetExecShards(vcursor, bindVars, each); err != nil {
return err
}
}
return nil
}

func formatTwoOptionsNicely(a, b string) string {
if a == b {
return a
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/engine/dbddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ func (c *DBDDL) GetTableName() string {
return ""
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (c *DBDDL) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
// The DBDDL primitive is not shard-aware, it acts globally on the cluster
return nil
}

// Execute implements the Primitive interface
func (c *DBDDL) Execute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
name := vcursor.GetDBDDLPluginName()
Expand Down
34 changes: 27 additions & 7 deletions go/vt/vtgate/engine/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)
Expand Down Expand Up @@ -76,6 +77,15 @@ func (ddl *DDL) GetTableName() string {
return ddl.DDL.GetTable().Name.String()
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (ddl *DDL) GetExecShards(vcursor VCursor, bindVars map[string]*query.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
primitiveDDL, err := ddl.getPrimitiveToExecute(vcursor, false)
if err != nil {
return err
}
return primitiveDDL.GetExecShards(vcursor, bindVars, each)
}

// IsOnlineSchemaDDL returns true if the query is an online schema change DDL
func (ddl *DDL) isOnlineSchemaDDL() bool {
switch ddl.DDL.GetAction() {
Expand All @@ -85,12 +95,13 @@ func (ddl *DDL) isOnlineSchemaDDL() bool {
return false
}

// Execute implements the Primitive interface
func (ddl *DDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
func (ddl *DDL) getPrimitiveToExecute(vcursor VCursor, updateSession bool) (Primitive, error) {
if ddl.CreateTempTable {
vcursor.Session().HasCreatedTempTable()
vcursor.Session().NeedsReservedConn()
return ddl.NormalDDL.Execute(vcursor, bindVars, wantfields)
if updateSession {
vcursor.Session().HasCreatedTempTable()
vcursor.Session().NeedsReservedConn()
}
return ddl.NormalDDL, nil
}

ddlStrategySetting, err := schema.ParseDDLStrategy(vcursor.Session().GetDDLStrategy())
Expand All @@ -104,13 +115,22 @@ func (ddl *DDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable
if !ddl.OnlineDDLEnabled {
return nil, schema.ErrOnlineDDLDisabled
}
return ddl.OnlineDDL.Execute(vcursor, bindVars, wantfields)
return ddl.OnlineDDL, nil
default: // non online-ddl
if !ddl.DirectDDLEnabled {
return nil, schema.ErrDirectDDLDisabled
}
return ddl.NormalDDL.Execute(vcursor, bindVars, wantfields)
return ddl.NormalDDL, nil
}
}

// Execute implements the Primitive interface
func (ddl *DDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
primitiveDDL, err := ddl.getPrimitiveToExecute(vcursor, true)
if err != nil {
return nil, err
}
return primitiveDDL.Execute(vcursor, bindVars, wantfields)
}

// StreamExecute implements the Primitive interface
Expand Down
54 changes: 54 additions & 0 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,60 @@ func (del *Delete) GetTableName() string {
return ""
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (del *Delete) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
switch del.Opcode {
case Unsharded:
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{key.DestinationAllShards{}})
if err != nil {
return err
}
each(rss[0])
return nil
case Equal:
key, err := del.Values[0].ResolveValue(bindVars)
if err != nil {
return err
}
rs, _, err := resolveSingleShard(vcursor, del.Vindex, del.Keyspace, key)
if err != nil {
return err
}
each(rs)
return nil
case In:
rss, _, err := resolveMultiValueShards(vcursor, del.Keyspace, del.Query, bindVars, del.Values[0], del.Vindex)
if err != nil {
return err
}
for _, rs := range rss {
each(rs)
}
return nil
case Scatter:
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{key.DestinationAllShards{}})
if err != nil {
return err
}
for _, rs := range rss {
each(rs)
}
return nil
case ByDestination:
rss, _, err := vcursor.ResolveDestinations(del.Keyspace.Name, nil, []key.Destination{del.TargetDestination})
if err != nil {
return err
}
for _, rs := range rss {
each(rs)
}
return nil
default:
// Unreachable.
return fmt.Errorf("unsupported opcode: %v", del)
}
}

// Execute performs a non-streaming exec.
func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
if del.QueryTimeout != 0 {
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/engine/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package engine
import (
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vtgate/evalengine"
)

Expand Down Expand Up @@ -154,6 +155,11 @@ func (d *Distinct) GetTableName() string {
return d.Source.GetTableName()
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (d *Distinct) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
return d.Source.GetExecShards(vcursor, bindVars, each)
}

// GetFields implements the Primitive interface
func (d *Distinct) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return d.Source.GetFields(vcursor, bindVars)
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vtgate/engine/fake_primitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/srvtopo"

querypb "vitess.io/vitess/go/vt/proto/query"
)
Expand Down Expand Up @@ -63,6 +64,10 @@ func (f *fakePrimitive) GetTableName() string {
return "fakeTable"
}

func (f *fakePrimitive) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
return nil
}

func (f *fakePrimitive) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields))
if f.results == nil {
Expand Down
24 changes: 24 additions & 0 deletions go/vt/vtgate/engine/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,30 @@ func (ins *Insert) GetTableName() string {
return ""
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (ins *Insert) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
switch ins.Opcode {
case InsertUnsharded:
rss, _, err := vcursor.ResolveDestinations(ins.Keyspace.Name, nil, []key.Destination{key.DestinationAllShards{}})
if err != nil {
return err
}
each(rss[0])
return nil
case InsertSharded, InsertShardedIgnore:
rss, _, err := ins.getInsertShardedRoute(vcursor, bindVars)
if err != nil {
return err
}
for _, rs := range rss {
each(rs)
}
return nil
default:
return fmt.Errorf("unsupported query route: %v", ins)
}
}

// Execute performs a non-streaming exec.
func (ins *Insert) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
if ins.QueryTimeout != 0 {
Expand Down
12 changes: 12 additions & 0 deletions go/vt/vtgate/engine/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/srvtopo"
)

var _ Primitive = (*Join)(nil)
Expand Down Expand Up @@ -168,6 +169,17 @@ func (jn *Join) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVari
return result, nil
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (jn *Join) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
if err := jn.Left.GetExecShards(vcursor, bindVars, each); err != nil {
return err
}
if err := jn.Right.GetExecShards(vcursor, bindVars, each); err != nil {
return err
}
return nil
}

// Inputs returns the input primitives for this join
func (jn *Join) Inputs() []Primitive {
return []Primitive{jn.Left, jn.Right}
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/engine/limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"

"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -51,6 +52,11 @@ func (l *Limit) GetTableName() string {
return l.Input.GetTableName()
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (l *Limit) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
return l.Input.GetExecShards(vcursor, bindVars, each)
}

// Execute satisfies the Primtive interface.
func (l *Limit) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
count, err := l.fetchCount(bindVars)
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vtgate/engine/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)
Expand Down Expand Up @@ -58,6 +59,19 @@ func (l *Lock) GetTableName() string {
return "dual"
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (l *Lock) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
rss, _, err := vcursor.ResolveDestinations(l.Keyspace.Name, nil, []key.Destination{l.TargetDestination})
if err != nil {
return err
}
if len(rss) != 1 {
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "lock query can be routed to single shard only: %v", rss)
}
each(rss[0])
return nil
}

// Execute is part of the Primitive interface
func (l *Lock) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
rss, _, err := vcursor.ResolveDestinations(l.Keyspace.Name, nil, []key.Destination{l.TargetDestination})
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/engine/memory_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"sort"
"strings"

"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/sqltypes"
Expand Down Expand Up @@ -59,6 +60,11 @@ func (ms *MemorySort) GetTableName() string {
return ms.Input.GetTableName()
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (ms *MemorySort) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
return ms.Input.GetExecShards(vcursor, bindVars, each)
}

// SetTruncateColumnCount sets the truncate column count.
func (ms *MemorySort) SetTruncateColumnCount(count int) {
ms.TruncateColumnCount = count
Expand Down
16 changes: 14 additions & 2 deletions go/vt/vtgate/engine/merge_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ package engine

import (
"container/heap"
"context"
"io"

"vitess.io/vitess/go/mysql"

"context"
"vitess.io/vitess/go/vt/srvtopo"

"vitess.io/vitess/go/sqltypes"

Expand Down Expand Up @@ -65,6 +65,18 @@ func (ms *MergeSort) GetKeyspaceName() string { return "" }
// GetTableName satisfies Primitive.
func (ms *MergeSort) GetTableName() string { return "" }

// GetExecShards lists all the shards that would be accessed by this primitive
func (ms *MergeSort) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
for _, merge := range ms.Primitives {
if primitive, ok := merge.(Primitive); ok {
if err := primitive.GetExecShards(vcursor, bindVars, each); err != nil {
return err
}
}
}
return nil
}

// Execute is not supported.
func (ms *MergeSort) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] Execute is not reachable")
Expand Down
13 changes: 13 additions & 0 deletions go/vt/vtgate/engine/mstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"vitess.io/vitess/go/vt/key"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
)
Expand Down Expand Up @@ -58,6 +59,18 @@ func (m *MStream) GetTableName() string {
return m.TableName
}

// GetExecShards lists all the shards that would be accessed by this primitive
func (m *MStream) GetExecShards(vcursor VCursor, bindVars map[string]*querypb.BindVariable, each func(rs *srvtopo.ResolvedShard)) error {
rss, _, err := vcursor.ResolveDestinations(m.Keyspace.Name, nil, []key.Destination{m.TargetDestination})
if err != nil {
return err
}
for _, rs := range rss {
each(rs)
}
return nil
}

// Execute implements the Primitive interface
func (m *MStream) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
return nil, vterrors.New(vtrpcpb.Code_INTERNAL, "[BUG] 'Execute' called for Stream")
Expand Down
Loading