Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/concatenate.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func formatTwoOptionsNicely(a, b string) string {
var errWrongNumberOfColumnsInSelect = vterrors.NewErrorf(vtrpcpb.Code_FAILED_PRECONDITION, vterrors.WrongNumberOfColumnsInSelect, "The used SELECT statements have a different number of columns")

// Execute performs a non-streaming exec.
func (c *Concatenate) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
func (c *Concatenate) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
res, err := c.execSources(vcursor, bindVars, wantfields)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *Concatenate) execSources(vcursor VCursor, bindVars map[string]*querypb.
for i, source := range c.Sources {
currIndex, currSource := i, source
g.Go(func() error {
result, err := currSource.Execute(vcursor, bindVars, wantfields)
result, err := vcursor.ExecutePrimitive(currSource, bindVars, wantfields)
if err != nil {
return err
}
Expand All @@ -140,7 +140,7 @@ func (c *Concatenate) execSources(vcursor VCursor, bindVars map[string]*querypb.
}

// StreamExecute performs a streaming exec.
func (c *Concatenate) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
func (c *Concatenate) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
var seenFields []*querypb.Field
var fieldset sync.WaitGroup
var cbMu sync.Mutex
Expand All @@ -154,7 +154,7 @@ func (c *Concatenate) StreamExecute(vcursor VCursor, bindVars map[string]*queryp
currIndex, currSource := i, source

g.Go(func() error {
err := currSource.StreamExecute(vcursor, bindVars, wantfields, func(resultChunk *sqltypes.Result) error {
err := vcursor.StreamExecutePrimitive(currSource, bindVars, wantfields, func(resultChunk *sqltypes.Result) error {
// if we have fields to compare, make sure all the fields are all the same
if currIndex == 0 && !fieldsSent {
defer fieldset.Done()
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/concatenate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestConcatenate_NoErrors(t *testing.T) {
}

t.Run(tc.testName+"-Execute", func(t *testing.T) {
qr, err := concatenate.Execute(&noopVCursor{ctx: context.Background()}, nil, true)
qr, err := concatenate.TryExecute(&noopVCursor{ctx: context.Background()}, nil, true)
if tc.expectedError == "" {
require.NoError(t, err)
require.Equal(t, tc.expectedResult, qr)
Expand Down Expand Up @@ -129,7 +129,7 @@ func TestConcatenate_WithErrors(t *testing.T) {
},
}
ctx := context.Background()
_, err := concatenate.Execute(&noopVCursor{ctx: ctx}, nil, true)
_, err := concatenate.TryExecute(&noopVCursor{ctx: ctx}, nil, true)
require.EqualError(t, err, strFailed)

_, err = wrapStreamExecute(concatenate, &noopVCursor{ctx: ctx}, nil, true)
Expand All @@ -142,7 +142,7 @@ func TestConcatenate_WithErrors(t *testing.T) {
&fakePrimitive{results: []*sqltypes.Result{fake, fake}},
},
}
_, err = concatenate.Execute(&noopVCursor{ctx: ctx}, nil, true)
_, err = concatenate.TryExecute(&noopVCursor{ctx: ctx}, nil, true)
require.EqualError(t, err, strFailed)
_, err = wrapStreamExecute(concatenate, &noopVCursor{ctx: ctx}, nil, true)
require.EqualError(t, err, strFailed)
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/engine/dbddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (c *DBDDL) GetTableName() string {
}

// Execute implements the Primitive interface
func (c *DBDDL) Execute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
func (c *DBDDL) TryExecute(vcursor VCursor, _ map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
name := vcursor.GetDBDDLPluginName()
plugin, ok := databaseCreatorPlugins[name]
if !ok {
Expand Down Expand Up @@ -181,8 +181,8 @@ func (c *DBDDL) dropDatabase(vcursor VCursor, plugin DBDDLPlugin) (*sqltypes.Res
}

// StreamExecute implements the Primitive interface
func (c *DBDDL) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
res, err := c.Execute(vcursor, bindVars, wantfields)
func (c *DBDDL) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
res, err := c.TryExecute(vcursor, bindVars, wantfields)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/dbddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestDBDDLCreateExecute(t *testing.T) {

vc := &loggingVCursor{dbDDLPlugin: pluginName}

_, err := primitive.Execute(vc, nil, false)
_, err := primitive.TryExecute(vc, nil, false)
require.NoError(t, err)
require.True(t, plugin.createCalled)
require.False(t, plugin.dropCalled)
Expand All @@ -61,7 +61,7 @@ func TestDBDDLDropExecute(t *testing.T) {

vc := &loggingVCursor{dbDDLPlugin: pluginName, ksAvailable: false}

_, err := primitive.Execute(vc, nil, false)
_, err := primitive.TryExecute(vc, nil, false)
require.NoError(t, err)
require.False(t, plugin.createCalled)
require.True(t, plugin.dropCalled)
Expand All @@ -74,11 +74,11 @@ func TestDBDDLTimeout(t *testing.T) {

primitive := &DBDDL{name: "ks", create: true, queryTimeout: 100}
vc := &loggingVCursor{dbDDLPlugin: pluginName, shardErr: fmt.Errorf("db not available")}
_, err := primitive.Execute(vc, nil, false)
_, err := primitive.TryExecute(vc, nil, false)
assert.EqualError(t, err, "could not validate create database: destination not resolved")

primitive = &DBDDL{name: "ks", queryTimeout: 100}
vc = &loggingVCursor{dbDDLPlugin: pluginName, ksAvailable: true}
_, err = primitive.Execute(vc, nil, false)
_, err = primitive.TryExecute(vc, nil, false)
assert.EqualError(t, err, "could not validate drop database: keyspace still available in vschema")
}
12 changes: 6 additions & 6 deletions go/vt/vtgate/engine/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,11 @@ func (ddl *DDL) isOnlineSchemaDDL() bool {
}

// Execute implements the Primitive interface
func (ddl *DDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
func (ddl *DDL) TryExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool) (result *sqltypes.Result, err error) {
if ddl.CreateTempTable {
vcursor.Session().HasCreatedTempTable()
vcursor.Session().NeedsReservedConn()
return ddl.NormalDDL.Execute(vcursor, bindVars, wantfields)
return vcursor.ExecutePrimitive(ddl.NormalDDL, bindVars, wantfields)
}

ddlStrategySetting, err := schema.ParseDDLStrategy(vcursor.Session().GetDDLStrategy())
Expand All @@ -104,18 +104,18 @@ func (ddl *DDL) Execute(vcursor VCursor, bindVars map[string]*query.BindVariable
if !ddl.OnlineDDLEnabled {
return nil, schema.ErrOnlineDDLDisabled
}
return ddl.OnlineDDL.Execute(vcursor, bindVars, wantfields)
return vcursor.ExecutePrimitive(ddl.OnlineDDL, bindVars, wantfields)
default: // non online-ddl
if !ddl.DirectDDLEnabled {
return nil, schema.ErrDirectDDLDisabled
}
return ddl.NormalDDL.Execute(vcursor, bindVars, wantfields)
return vcursor.ExecutePrimitive(ddl.NormalDDL, bindVars, wantfields)
}
}

// StreamExecute implements the Primitive interface
func (ddl *DDL) StreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
results, err := ddl.Execute(vcursor, bindVars, wantfields)
func (ddl *DDL) TryStreamExecute(vcursor VCursor, bindVars map[string]*query.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
results, err := ddl.TryExecute(vcursor, bindVars, wantfields)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (del *Delete) GetTableName() string {
}

// Execute performs a non-streaming exec.
func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
func (del *Delete) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, _ bool) (*sqltypes.Result, error) {
if del.QueryTimeout != 0 {
cancel := vcursor.SetContextTimeout(time.Duration(del.QueryTimeout) * time.Millisecond)
defer cancel()
Expand All @@ -93,7 +93,7 @@ func (del *Delete) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVar
}

// StreamExecute performs a streaming exec.
func (del *Delete) StreamExecute(VCursor, map[string]*querypb.BindVariable, bool, func(*sqltypes.Result) error) error {
func (del *Delete) TryStreamExecute(VCursor, map[string]*querypb.BindVariable, bool, func(*sqltypes.Result) error) error {
return fmt.Errorf("query %q cannot be used for streaming", del.Query)
}

Expand Down
32 changes: 16 additions & 16 deletions go/vt/vtgate/engine/delete_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestDeleteUnsharded(t *testing.T) {
}

vc := newDMLTestVCursor("0")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationAllShards()`,
Expand All @@ -50,11 +50,11 @@ func TestDeleteUnsharded(t *testing.T) {

// Failure cases
vc = &loggingVCursor{shardErr: errors.New("shard_error")}
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "shard_error")

vc = &loggingVCursor{}
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "cannot send query to multiple shards for un-sharded database: []")
}

Expand All @@ -74,7 +74,7 @@ func TestDeleteEqual(t *testing.T) {
}

vc := newDMLTestVCursor("-20", "20-")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations ks [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
Expand All @@ -83,7 +83,7 @@ func TestDeleteEqual(t *testing.T) {

// Failure case
del.Values = []sqltypes.PlanValue{{Key: "aa"}}
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "missing bind var aa")
}

Expand All @@ -107,7 +107,7 @@ func TestDeleteEqualNoRoute(t *testing.T) {
}

vc := newDMLTestVCursor("0")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
// This lookup query will return no rows. So, the DML will not be sent anywhere.
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestDeleteEqualNoScatter(t *testing.T) {
}

vc := newDMLTestVCursor("0")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "cannot map vindex to unique keyspace id: DestinationKeyRange(-)")
}

Expand Down Expand Up @@ -166,7 +166,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
vc := newDMLTestVCursor("-20", "20-")
vc.results = results

_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
Expand All @@ -182,7 +182,7 @@ func TestDeleteOwnedVindex(t *testing.T) {

// No rows changing
vc = newDMLTestVCursor("-20", "20-")
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
Expand All @@ -205,7 +205,7 @@ func TestDeleteOwnedVindex(t *testing.T) {
vc = newDMLTestVCursor("-20", "20-")
vc.results = results

_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestDeleteSharded(t *testing.T) {
}

vc := newDMLTestVCursor("-20", "20-")
_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationAllShards()`,
Expand All @@ -244,13 +244,13 @@ func TestDeleteSharded(t *testing.T) {

// Failure case
vc = &loggingVCursor{shardErr: errors.New("shard_error")}
_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.EqualError(t, err, "shard_error")
}

func TestDeleteNoStream(t *testing.T) {
del := &Delete{}
err := del.StreamExecute(nil, nil, false, nil)
err := del.TryStreamExecute(nil, nil, false, nil)
require.EqualError(t, err, `query "" cannot be used for streaming`)
}

Expand Down Expand Up @@ -278,7 +278,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
vc := newDMLTestVCursor("-20", "20-")
vc.results = results

_, err := del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err := del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationAllShards()`,
Expand All @@ -295,7 +295,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
// No rows changing
vc = newDMLTestVCursor("-20", "20-")

_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationAllShards()`,
Expand All @@ -318,7 +318,7 @@ func TestDeleteScatterOwnedVindex(t *testing.T) {
vc = newDMLTestVCursor("-20", "20-")
vc.results = results

_, err = del.Execute(vc, map[string]*querypb.BindVariable{}, false)
_, err = del.TryExecute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationAllShards()`,
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/distinct.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ func newProbeTable() *probeTable {
}

// Execute implements the Primitive interface
func (d *Distinct) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
input, err := d.Source.Execute(vcursor, bindVars, wantfields)
func (d *Distinct) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
input, err := vcursor.ExecutePrimitive(d.Source, bindVars, wantfields)
if err != nil {
return nil, err
}
Expand All @@ -116,10 +116,10 @@ func (d *Distinct) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVar
}

// StreamExecute implements the Primitive interface
func (d *Distinct) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
func (d *Distinct) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
pt := newProbeTable()

err := d.Source.StreamExecute(vcursor, bindVars, wantfields, func(input *sqltypes.Result) error {
err := vcursor.StreamExecutePrimitive(d.Source, bindVars, wantfields, func(input *sqltypes.Result) error {
result := &sqltypes.Result{
Fields: input.Fields,
InsertID: input.InsertID,
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/engine/distinct_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestDistinct(t *testing.T) {
t.Run(tc.testName+"-Execute", func(t *testing.T) {
distinct := &Distinct{Source: &fakePrimitive{results: []*sqltypes.Result{tc.inputs}}}

qr, err := distinct.Execute(&noopVCursor{ctx: context.Background()}, nil, true)
qr, err := distinct.TryExecute(&noopVCursor{ctx: context.Background()}, nil, true)
if tc.expectedError == "" {
require.NoError(t, err)
got := fmt.Sprintf("%v", qr.Rows)
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtgate/engine/fake_primitive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f *fakePrimitive) GetTableName() string {
return "fakeTable"
}

func (f *fakePrimitive) Execute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
func (f *fakePrimitive) TryExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("Execute %v %v", printBindVars(bindVars), wantfields))
if f.results == nil {
return nil, f.sendErr
Expand All @@ -77,7 +77,7 @@ func (f *fakePrimitive) Execute(vcursor VCursor, bindVars map[string]*querypb.Bi
return r, nil
}

func (f *fakePrimitive) StreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
func (f *fakePrimitive) TryStreamExecute(vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
f.log = append(f.log, fmt.Sprintf("StreamExecute %v %v", printBindVars(bindVars), wantfields))
if f.results == nil {
return f.sendErr
Expand Down Expand Up @@ -112,7 +112,7 @@ func (f *fakePrimitive) StreamExecute(vcursor VCursor, bindVars map[string]*quer

func (f *fakePrimitive) GetFields(vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
f.log = append(f.log, fmt.Sprintf("GetFields %v", printBindVars(bindVars)))
return f.Execute(vcursor, bindVars, true /* wantfields */)
return f.TryExecute(vcursor, bindVars, true /* wantfields */)
}

func (f *fakePrimitive) ExpectLog(t *testing.T, want []string) {
Expand All @@ -128,7 +128,7 @@ func (f *fakePrimitive) NeedsTransaction() bool {

func wrapStreamExecute(prim Primitive, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
var result *sqltypes.Result
err := prim.StreamExecute(vcursor, bindVars, wantfields, func(r *sqltypes.Result) error {
err := prim.TryStreamExecute(vcursor, bindVars, wantfields, func(r *sqltypes.Result) error {
if result == nil {
result = r
} else {
Expand Down
Loading