diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 2cf8973fe0e..eb3355d03b8 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -1595,11 +1595,11 @@ func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, } // ExecuteMultiShard implements the IExecutor interface -func (e *Executor) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, autocommit bool) (qr *sqltypes.Result, errs []error) { - return e.scatterConn.ExecuteMultiShard(ctx, rss, queries, tabletType, session, notInTransaction, autocommit) +func (e *Executor) ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, notInTransaction bool, autocommit bool) (qr *sqltypes.Result, errs []error) { + return e.scatterConn.ExecuteMultiShard(ctx, rss, queries, session, notInTransaction, autocommit) } // StreamExecuteMulti implements the IExecutor interface -func (e *Executor) StreamExecuteMulti(ctx context.Context, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, tabletType topodatapb.TabletType, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error { - return e.scatterConn.StreamExecuteMulti(ctx, query, rss, vars, tabletType, options, callback) +func (e *Executor) StreamExecuteMulti(ctx context.Context, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error { + return e.scatterConn.StreamExecuteMulti(ctx, query, rss, vars, options, callback) } diff --git a/go/vt/vtgate/resolver.go b/go/vt/vtgate/resolver.go index 23367fbab26..25cedde9ab9 100644 --- a/go/vt/vtgate/resolver.go +++ b/go/vt/vtgate/resolver.go @@ -84,7 +84,6 @@ func (res *Resolver) Execute( sql, bindVars, rss, - tabletType, session, notInTransaction, options, @@ -134,7 +133,6 @@ func (res *Resolver) StreamExecute( sql, bindVars, rss, - tabletType, options, callback) return err diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 663bf20ac41..98aec586c7b 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -29,7 +29,6 @@ import ( "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/discovery" querypb "vitess.io/vitess/go/vt/proto/query" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/srvtopo" @@ -141,7 +140,6 @@ func (stc *ScatterConn) Execute( query string, bindVars map[string]*querypb.BindVariable, rss []*srvtopo.ResolvedShard, - tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, options *querypb.ExecuteOptions, @@ -156,7 +154,6 @@ func (stc *ScatterConn) Execute( ctx, "Execute", rss, - tabletType, session, notInTransaction, func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64) (int64, error) { @@ -204,7 +201,6 @@ func (stc *ScatterConn) ExecuteMultiShard( ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, - tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, autocommit bool, @@ -218,7 +214,6 @@ func (stc *ScatterConn) ExecuteMultiShard( ctx, "Execute", rss, - tabletType, session, notInTransaction, func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64) (int64, error) { @@ -301,7 +296,6 @@ func (stc *ScatterConn) StreamExecute( query string, bindVars map[string]*querypb.BindVariable, rss []*srvtopo.ResolvedShard, - tabletType topodatapb.TabletType, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error, ) error { @@ -310,7 +304,7 @@ func (stc *ScatterConn) StreamExecute( var mu sync.Mutex fieldSent := false - allErrors := stc.multiGo(ctx, "StreamExecute", rss, tabletType, func(rs *srvtopo.ResolvedShard, i int) error { + allErrors := stc.multiGo("StreamExecute", rss, func(rs *srvtopo.ResolvedShard, i int) error { return rs.QueryService.StreamExecute(ctx, rs.Target, query, bindVars, 0, options, func(qr *sqltypes.Result) error { return stc.processOneStreamingResult(&mu, &fieldSent, qr, callback) }) @@ -328,7 +322,6 @@ func (stc *ScatterConn) StreamExecuteMulti( query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, - tabletType topodatapb.TabletType, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error, ) error { @@ -336,7 +329,7 @@ func (stc *ScatterConn) StreamExecuteMulti( var mu sync.Mutex fieldSent := false - allErrors := stc.multiGo(ctx, "StreamExecute", rss, tabletType, func(rs *srvtopo.ResolvedShard, i int) error { + allErrors := stc.multiGo("StreamExecute", rss, func(rs *srvtopo.ResolvedShard, i int) error { return rs.QueryService.StreamExecute(ctx, rs.Target, query, bindVars[i], 0, options, func(qr *sqltypes.Result) error { return stc.processOneStreamingResult(&mu, &fieldSent, qr, callback) }) @@ -390,7 +383,7 @@ func (stc *ScatterConn) MessageStream(ctx context.Context, rss []*srvtopo.Resolv var mu sync.Mutex fieldSent := false lastErrors := newTimeTracker() - allErrors := stc.multiGo(ctx, "MessageStream", rss, topodatapb.TabletType_MASTER, func(rs *srvtopo.ResolvedShard, i int) error { + allErrors := stc.multiGo("MessageStream", rss, func(rs *srvtopo.ResolvedShard, i int) error { // This loop handles the case where a reparent happens, which can cause // an individual stream to end. If we don't succeed on the retries for // messageStreamGracePeriod, we abort and return an error. @@ -463,10 +456,8 @@ func (stc *ScatterConn) GetHealthCheckCacheStatus() discovery.TabletsCacheStatus // shards in parallel. This does not handle any transaction state. // The action function must match the shardActionFunc2 signature. func (stc *ScatterConn) multiGo( - ctx context.Context, name string, rss []*srvtopo.ResolvedShard, - tabletType topodatapb.TabletType, action shardActionFunc, ) (allErrors *concurrency.AllErrorRecorder) { allErrors = new(concurrency.AllErrorRecorder) @@ -515,7 +506,6 @@ func (stc *ScatterConn) multiGoTransaction( ctx context.Context, name string, rss []*srvtopo.ResolvedShard, - tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, action shardActionTransactionFunc, @@ -544,25 +534,23 @@ func (stc *ScatterConn) multiGoTransaction( } } - var wg sync.WaitGroup if numShards == 1 { // only one shard, do it synchronously. for i, rs := range rss { oneShard(rs, i) - goto end } + } else { + var wg sync.WaitGroup + for i, rs := range rss { + wg.Add(1) + go func(rs *srvtopo.ResolvedShard, i int) { + defer wg.Done() + oneShard(rs, i) + }(rs, i) + } + wg.Wait() } - for i, rs := range rss { - wg.Add(1) - go func(rs *srvtopo.ResolvedShard, i int) { - defer wg.Done() - oneShard(rs, i) - }(rs, i) - } - wg.Wait() - -end: if session.MustRollback() { stc.txConn.Rollback(ctx, session) } diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index b14deea5890..1e508bac31f 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -42,19 +42,19 @@ import ( func TestScatterConnExecute(t *testing.T) { testScatterConnGeneric(t, "TestScatterConnExecute", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(context.Background(), "TestScatterConnExecute", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) + rss, err := res.ResolveDestination(ctx, "TestScatterConnExecute", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) if err != nil { return nil, err } - return sc.Execute(context.Background(), "query", nil, rss, topodatapb.TabletType_REPLICA, NewSafeSession(nil), false, nil, false) + return sc.Execute(ctx, "query", nil, rss, NewSafeSession(nil), false, nil, false) }) } func TestScatterConnExecuteMulti(t *testing.T) { testScatterConnGeneric(t, "TestScatterConnExecuteMultiShard", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(context.Background(), "TestScatterConnExecuteMultiShard", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) + rss, err := res.ResolveDestination(ctx, "TestScatterConnExecuteMultiShard", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) if err != nil { return nil, err } @@ -67,7 +67,7 @@ func TestScatterConnExecuteMulti(t *testing.T) { } } - qr, errs := sc.ExecuteMultiShard(context.Background(), rss, queries, topodatapb.TabletType_REPLICA, NewSafeSession(nil), false, false) + qr, errs := sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(nil), false, false) return qr, vterrors.Aggregate(errs) }) } @@ -75,13 +75,13 @@ func TestScatterConnExecuteMulti(t *testing.T) { func TestScatterConnStreamExecute(t *testing.T) { testScatterConnGeneric(t, "TestScatterConnStreamExecute", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(context.Background(), "TestScatterConnStreamExecute", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) + rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecute", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) if err != nil { return nil, err } qr := new(sqltypes.Result) - err = sc.StreamExecute(context.Background(), "query", nil, rss, topodatapb.TabletType_REPLICA, nil, func(r *sqltypes.Result) error { + err = sc.StreamExecute(ctx, "query", nil, rss, nil, func(r *sqltypes.Result) error { qr.AppendResult(r) return nil }) @@ -92,13 +92,13 @@ func TestScatterConnStreamExecute(t *testing.T) { func TestScatterConnStreamExecuteMulti(t *testing.T) { testScatterConnGeneric(t, "TestScatterConnStreamExecuteMulti", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(context.Background(), "TestScatterConnStreamExecuteMulti", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) + rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecuteMulti", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) if err != nil { return nil, err } bvs := make([]map[string]*querypb.BindVariable, len(rss)) qr := new(sqltypes.Result) - err = sc.StreamExecuteMulti(context.Background(), "query", rss, bvs, topodatapb.TabletType_REPLICA, nil, func(r *sqltypes.Result) error { + err = sc.StreamExecuteMulti(ctx, "query", rss, bvs, nil, func(r *sqltypes.Result) error { qr.AppendResult(r) return nil }) @@ -248,14 +248,14 @@ func TestMaxMemoryRows(t *testing.T) { sbc1.SetResults([]*sqltypes.Result{tworows, tworows}) res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, _, err := res.ResolveDestinations(context.Background(), "TestMaxMemoryRows", topodatapb.TabletType_REPLICA, nil, + rss, _, err := res.ResolveDestinations(ctx, "TestMaxMemoryRows", topodatapb.TabletType_REPLICA, nil, []key.Destination{key.DestinationShard("0"), key.DestinationShard("1")}) if err != nil { t.Fatalf("ResolveDestination(0) failed: %v", err) } session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - _, err = sc.Execute(context.Background(), "query1", nil, rss, topodatapb.TabletType_REPLICA, session, true, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss, session, true, nil, false) want := "in-memory row count exceeded allowed limit of 3" if err == nil || err.Error() != want { t.Errorf("Execute(): %v, want %v", err, want) @@ -268,7 +268,7 @@ func TestMaxMemoryRows(t *testing.T) { Sql: "query1", BindVariables: map[string]*querypb.BindVariable{}, }} - _, errs := sc.ExecuteMultiShard(context.Background(), rss, queries, topodatapb.TabletType_REPLICA, session, false, false) + _, errs := sc.ExecuteMultiShard(ctx, rss, queries, session, false, false) err = errs[0] if err == nil || err.Error() != want { t.Errorf("Execute(): %v, want %v", err, want) @@ -313,7 +313,7 @@ func TestMultiExecs(t *testing.T) { }, } - _, _ = sc.ExecuteMultiShard(context.Background(), rss, queries, topodatapb.TabletType_REPLICA, NewSafeSession(nil), false, false) + _, _ = sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(nil), false, false) if len(sbc0.Queries) == 0 || len(sbc1.Queries) == 0 { t.Fatalf("didn't get expected query") } @@ -356,7 +356,7 @@ func TestMultiExecs(t *testing.T) { "bv1": sqltypes.Int64BindVariable(1), }, } - _ = sc.StreamExecuteMulti(context.Background(), "query", rss, bvs, topodatapb.TabletType_REPLICA, nil, func(*sqltypes.Result) error { + _ = sc.StreamExecuteMulti(ctx, "query", rss, bvs, nil, func(*sqltypes.Result) error { return nil }) if !reflect.DeepEqual(sbc0.Queries[0].BindVariables, wantVars0) { @@ -373,11 +373,11 @@ func TestScatterConnStreamExecuteSendError(t *testing.T) { sc := newTestScatterConn(hc, new(sandboxTopo), "aa") hc.AddTestTablet("aa", "0", 1, "TestScatterConnStreamExecuteSendError", "0", topodatapb.TabletType_REPLICA, true, 1, nil) res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(context.Background(), "TestScatterConnStreamExecuteSendError", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) + rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecuteSendError", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) if err != nil { t.Fatalf("ResolveDestination failed: %v", err) } - err = sc.StreamExecute(context.Background(), "query", nil, rss, topodatapb.TabletType_REPLICA, nil, func(*sqltypes.Result) error { + err = sc.StreamExecute(ctx, "query", nil, rss, nil, func(*sqltypes.Result) error { return fmt.Errorf("send error") }) want := "send error" @@ -398,18 +398,18 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { sbc1 := hc.AddTestTablet("aa", "1", 1, "TestScatterConnQueryNotInTransaction", "1", topodatapb.TabletType_REPLICA, true, 1, nil) res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss0, err := res.ResolveDestination(context.Background(), "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) + rss0, err := res.ResolveDestination(ctx, "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) if err != nil { t.Fatalf("ResolveDestination(0) failed: %v", err) } - rss1, err := res.ResolveDestination(context.Background(), "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("1")) + rss1, err := res.ResolveDestination(ctx, "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("1")) if err != nil { t.Fatalf("ResolveDestination(1) failed: %v", err) } session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_REPLICA, session, true, nil, false) - sc.Execute(context.Background(), "query1", nil, rss1, topodatapb.TabletType_REPLICA, session, false, nil, false) + sc.Execute(ctx, "query1", nil, rss0, session, true, nil, false) + sc.Execute(ctx, "query1", nil, rss1, session, false, nil, false) wantSession := vtgatepb.Session{ InTransaction: true, @@ -425,7 +425,7 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { if !proto.Equal(&wantSession, session.Session) { t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session) } - sc.txConn.Commit(context.Background(), session) + sc.txConn.Commit(ctx, session) { execCount0 := sbc0.ExecCount.Get() execCount1 := sbc1.ExecCount.Get() @@ -449,17 +449,17 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) res = srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss0, err = res.ResolveDestination(context.Background(), "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) + rss0, err = res.ResolveDestination(ctx, "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) if err != nil { t.Fatalf("ResolveDestination(0) failed: %v", err) } - rss1, err = res.ResolveDestination(context.Background(), "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("1")) + rss1, err = res.ResolveDestination(ctx, "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("1")) if err != nil { t.Fatalf("ResolveDestination(1) failed: %v", err) } - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_REPLICA, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss1, topodatapb.TabletType_REPLICA, session, true, nil, false) + sc.Execute(ctx, "query1", nil, rss0, session, false, nil, false) + sc.Execute(ctx, "query1", nil, rss1, session, true, nil, false) wantSession = vtgatepb.Session{ InTransaction: true, @@ -475,7 +475,7 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { if !proto.Equal(&wantSession, session.Session) { t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session) } - sc.txConn.Commit(context.Background(), session) + sc.txConn.Commit(ctx, session) { execCount0 := sbc0.ExecCount.Get() execCount1 := sbc1.ExecCount.Get() @@ -499,17 +499,17 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) res = srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss0, err = res.ResolveDestination(context.Background(), "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) + rss0, err = res.ResolveDestination(ctx, "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) if err != nil { t.Fatalf("ResolveDestination(0) failed: %v", err) } - rss1, err = res.ResolveDestination(context.Background(), "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShards([]string{"0", "1"})) + rss1, err = res.ResolveDestination(ctx, "TestScatterConnQueryNotInTransaction", topodatapb.TabletType_REPLICA, key.DestinationShards([]string{"0", "1"})) if err != nil { t.Fatalf("ResolveDestination(1) failed: %v", err) } - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_REPLICA, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss1, topodatapb.TabletType_REPLICA, session, true, nil, false) + sc.Execute(ctx, "query1", nil, rss0, session, false, nil, false) + sc.Execute(ctx, "query1", nil, rss1, session, true, nil, false) wantSession = vtgatepb.Session{ InTransaction: true, @@ -525,7 +525,7 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) { if !proto.Equal(&wantSession, session.Session) { t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session) } - sc.txConn.Commit(context.Background(), session) + sc.txConn.Commit(ctx, session) { execCount0 := sbc0.ExecCount.Get() execCount1 := sbc1.ExecCount.Get() @@ -551,11 +551,11 @@ func TestScatterConnSingleDB(t *testing.T) { hc.AddTestTablet("aa", "1", 1, "TestScatterConnSingleDB", "1", topodatapb.TabletType_MASTER, true, 1, nil) res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss0, err := res.ResolveDestination(context.Background(), "TestScatterConnSingleDB", topodatapb.TabletType_MASTER, key.DestinationShard("0")) + rss0, err := res.ResolveDestination(ctx, "TestScatterConnSingleDB", topodatapb.TabletType_MASTER, key.DestinationShard("0")) if err != nil { t.Fatalf("ResolveDestination(0) failed: %v", err) } - rss1, err := res.ResolveDestination(context.Background(), "TestScatterConnSingleDB", topodatapb.TabletType_MASTER, key.DestinationShard("1")) + rss1, err := res.ResolveDestination(ctx, "TestScatterConnSingleDB", topodatapb.TabletType_MASTER, key.DestinationShard("1")) if err != nil { t.Fatalf("ResolveDestination(1) failed: %v", err) } @@ -564,18 +564,18 @@ func TestScatterConnSingleDB(t *testing.T) { // SingleDb (legacy) session := NewSafeSession(&vtgatepb.Session{InTransaction: true, SingleDb: true}) - _, err = sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss0, session, false, nil, false) require.NoError(t, err) - _, err = sc.Execute(context.Background(), "query1", nil, rss1, topodatapb.TabletType_MASTER, session, false, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss1, session, false, nil, false) if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("Multi DB exec: %v, must contain %s", err, want) } // TransactionMode_SINGLE in session session = NewSafeSession(&vtgatepb.Session{InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE}) - _, err = sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss0, session, false, nil, false) require.NoError(t, err) - _, err = sc.Execute(context.Background(), "query1", nil, rss1, topodatapb.TabletType_MASTER, session, false, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss1, session, false, nil, false) if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("Multi DB exec: %v, must contain %s", err, want) } @@ -583,9 +583,9 @@ func TestScatterConnSingleDB(t *testing.T) { // TransactionMode_SINGLE in txconn sc.txConn.mode = vtgatepb.TransactionMode_SINGLE session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) - _, err = sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss0, session, false, nil, false) require.NoError(t, err) - _, err = sc.Execute(context.Background(), "query1", nil, rss1, topodatapb.TabletType_MASTER, session, false, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss1, session, false, nil, false) if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("Multi DB exec: %v, must contain %s", err, want) } @@ -593,9 +593,9 @@ func TestScatterConnSingleDB(t *testing.T) { // TransactionMode_MULTI in txconn. Should not fail. sc.txConn.mode = vtgatepb.TransactionMode_MULTI session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) - _, err = sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss0, session, false, nil, false) require.NoError(t, err) - _, err = sc.Execute(context.Background(), "query1", nil, rss1, topodatapb.TabletType_MASTER, session, false, nil, false) + _, err = sc.Execute(ctx, "query1", nil, rss1, session, false, nil, false) require.NoError(t, err) } @@ -652,7 +652,9 @@ func newTestScatterConn(hc discovery.LegacyHealthCheck, serv srvtopo.Server, cel // The topo.Server is used to start watching the cells described // in '-cells_to_watch' command line parameter, which is // empty by default. So it's unused in this test, set to nil. - gw := GatewayCreator()(context.Background(), hc, serv, cell, 3) + gw := GatewayCreator()(ctx, hc, serv, cell, 3) tc := NewTxConn(gw, vtgatepb.TransactionMode_TWOPC) return LegacyNewScatterConn("", tc, gw, hc) } + +var ctx = context.Background() diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index e45d207d946..85ec6da9248 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -48,7 +48,7 @@ func TestTxConnBegin(t *testing.T) { if !proto.Equal(session, wantSession) { t.Errorf("begin: %v, want %v", session, wantSession) } - if _, err := sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, NewSafeSession(session), false, nil, false); err != nil { + if _, err := sc.Execute(context.Background(), "query1", nil, rss0, NewSafeSession(session), false, nil, false); err != nil { t.Error(err) } @@ -70,7 +70,7 @@ func TestTxConnCommitSuccess(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) wantSession := vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -85,7 +85,7 @@ func TestTxConnCommitSuccess(t *testing.T) { if !proto.Equal(session.Session, &wantSession) { t.Errorf("Session:\n%+v, want\n%+v", *session.Session, wantSession) } - sc.Execute(context.Background(), "query1", nil, rss01, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss01, session, false, nil, false) wantSession = vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -133,13 +133,13 @@ func TestTxConnCommitOrderFailure1(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(context.Background(), rss0, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss0, queries, session, false, false) session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(context.Background(), rss0, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss0, queries, session, false, false) session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(context.Background(), rss1, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss1, queries, session, false, false) sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 err := sc.txConn.Commit(context.Background(), session) @@ -173,13 +173,13 @@ func TestTxConnCommitOrderFailure2(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(context.Background(), rss1, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss1, queries, session, false, false) session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(context.Background(), rss0, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss0, queries, session, false, false) session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(context.Background(), rss1, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss1, queries, session, false, false) sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 err := sc.txConn.Commit(context.Background(), session) @@ -213,13 +213,13 @@ func TestTxConnCommitOrderFailure3(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(context.Background(), rss0, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss0, queries, session, false, false) session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(context.Background(), rss0, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss0, queries, session, false, false) session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(context.Background(), rss1, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss1, queries, session, false, false) sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 if err := sc.txConn.Commit(context.Background(), session); err != nil { @@ -253,7 +253,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(context.Background(), rss0, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss0, queries, session, false, false) wantSession := vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -270,7 +270,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { } session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(context.Background(), rss0, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss0, queries, session, false, false) wantSession = vtgatepb.Session{ InTransaction: true, PreSessions: []*vtgatepb.Session_ShardSession{{ @@ -295,7 +295,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { } session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(context.Background(), rss1, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss1, queries, session, false, false) wantSession = vtgatepb.Session{ InTransaction: true, PreSessions: []*vtgatepb.Session_ShardSession{{ @@ -328,7 +328,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { } // Ensure nothing changes if we reuse a transaction. - sc.ExecuteMultiShard(context.Background(), rss1, queries, topodatapb.TabletType_MASTER, session, false, false) + sc.ExecuteMultiShard(context.Background(), rss1, queries, session, false, false) if !proto.Equal(session.Session, &wantSession) { t.Errorf("Session:\n%+v, want\n%+v", *session.Session, wantSession) } @@ -352,8 +352,8 @@ func TestTxConnCommit2PC(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TestTxConnCommit2PC") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss01, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss01, session, false, nil, false) session.TransactionMode = vtgatepb.TransactionMode_TWOPC if err := sc.txConn.Commit(context.Background(), session); err != nil { t.Error(err) @@ -378,7 +378,7 @@ func TestTxConnCommit2PC(t *testing.T) { func TestTxConnCommit2PCOneParticipant(t *testing.T) { sc, sbc0, _, rss0, _, _ := newTestTxConnEnv(t, "TestTxConnCommit2PCOneParticipant") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) session.TransactionMode = vtgatepb.TransactionMode_TWOPC if err := sc.txConn.Commit(context.Background(), session); err != nil { t.Error(err) @@ -392,8 +392,8 @@ func TestTxConnCommit2PCCreateTransactionFail(t *testing.T) { sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, "TestTxConnCommit2PCCreateTransactionFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss1, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss1, session, false, nil, false) sbc0.MustFailCreateTransaction = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -429,8 +429,8 @@ func TestTxConnCommit2PCPrepareFail(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TestTxConnCommit2PCPrepareFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss01, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss01, session, false, nil, false) sbc1.MustFailPrepare = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -460,8 +460,8 @@ func TestTxConnCommit2PCStartCommitFail(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TestTxConnCommit2PCStartCommitFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss01, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss01, session, false, nil, false) sbc0.MustFailStartCommit = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -491,8 +491,8 @@ func TestTxConnCommit2PCCommitPreparedFail(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TestTxConnCommit2PCCommitPreparedFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss01, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss01, session, false, nil, false) sbc1.MustFailCommitPrepared = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -522,8 +522,8 @@ func TestTxConnCommit2PCConcludeTransactionFail(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TestTxConnCommit2PCConcludeTransactionFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss01, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss01, session, false, nil, false) sbc0.MustFailConcludeTransaction = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -553,8 +553,8 @@ func TestTxConnRollback(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, "TxConnRollback") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.Execute(context.Background(), "query1", nil, rss0, topodatapb.TabletType_MASTER, session, false, nil, false) - sc.Execute(context.Background(), "query1", nil, rss01, topodatapb.TabletType_MASTER, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss0, session, false, nil, false) + sc.Execute(context.Background(), "query1", nil, rss01, session, false, nil, false) if err := sc.txConn.Rollback(context.Background(), session); err != nil { t.Error(err) } diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 4b07c27e24f..097c7aed267 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -54,8 +54,8 @@ var _ iExecute = (*Executor)(nil) // vcursor_impl needs these facilities to be able to be able to execute queries for vindexes type iExecute interface { Execute(ctx context.Context, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error) - ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, tabletType topodatapb.TabletType, session *SafeSession, notInTransaction bool, autocommit bool) (qr *sqltypes.Result, errs []error) - StreamExecuteMulti(ctx context.Context, s string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, tabletType topodatapb.TabletType, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error + ExecuteMultiShard(ctx context.Context, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, notInTransaction bool, autocommit bool) (qr *sqltypes.Result, errs []error) + StreamExecuteMulti(ctx context.Context, s string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, options *querypb.ExecuteOptions, callback func(reply *sqltypes.Result) error) error // TODO: remove when resolver is gone ParseDestinationTarget(targetString string) (string, topodatapb.TabletType, key.Destination, error) @@ -273,7 +273,7 @@ func (vc *vcursorImpl) Execute(method string, query string, bindVars map[string] // ExecuteMultiShard is part of the engine.VCursor interface. func (vc *vcursorImpl) ExecuteMultiShard(rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, rollbackOnError, autocommit bool) (*sqltypes.Result, []error) { atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(queries))) - qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, commentedShardQueries(queries, vc.marginComments), vc.tabletType, vc.safeSession, false, autocommit) + qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, false, autocommit) if errs == nil && rollbackOnError { vc.rollbackOnPartialExec = true @@ -297,14 +297,14 @@ func (vc *vcursorImpl) ExecuteStandalone(query string, bindVars map[string]*quer } // The autocommit flag is always set to false because we currently don't // execute DMLs through ExecuteStandalone. - qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, bqs, vc.tabletType, NewAutocommitSession(vc.safeSession.Session), false, false /* autocommit */) + qr, errs := vc.executor.ExecuteMultiShard(vc.ctx, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false, false /* autocommit */) return qr, vterrors.Aggregate(errs) } // StreamExeculteMulti is the streaming version of ExecuteMultiShard. func (vc *vcursorImpl) StreamExecuteMulti(query string, rss []*srvtopo.ResolvedShard, bindVars []map[string]*querypb.BindVariable, callback func(reply *sqltypes.Result) error) error { atomic.AddUint32(&vc.logStats.ShardQueries, uint32(len(rss))) - return vc.executor.StreamExecuteMulti(vc.ctx, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.tabletType, vc.safeSession.Options, callback) + return vc.executor.StreamExecuteMulti(vc.ctx, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession.Options, callback) } // ExecuteKeyspaceID is part of the engine.VCursor interface. diff --git a/proto/vtgate.proto b/proto/vtgate.proto index 761e478957d..d71de7595f1 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -116,6 +116,7 @@ message Session { map user_defined_variables = 13; // system_variables keeps track of all session variables set for this connection + // TODO: systay should we keep this so we can apply it ordered? map system_variables = 14; // row_count keeps track of the last seen rows affected for this session