diff --git a/go/vt/discovery/fake_healthcheck.go b/go/vt/discovery/fake_healthcheck.go index ef21b9f1d69..5cc04885530 100644 --- a/go/vt/discovery/fake_healthcheck.go +++ b/go/vt/discovery/fake_healthcheck.go @@ -17,9 +17,13 @@ limitations under the License. package discovery import ( + "context" "sort" "sync" + "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/queryservice" @@ -30,7 +34,7 @@ import ( ) // This file contains the definitions for a FakeHealthCheck class to -// simulate a LegacyHealthCheck module. Note it is not in a sub-package because +// simulate a HealthCheck module. Note it is not in a sub-package because // otherwise it couldn't be used in this package's tests because of // circular dependencies. @@ -41,7 +45,7 @@ func NewFakeHealthCheck() *FakeHealthCheck { } } -// FakeHealthCheck implements discovery.LegacyHealthCheck. +// FakeHealthCheck implements discovery.HealthCheck. type FakeHealthCheck struct { // mu protects the items map mu sync.RWMutex @@ -54,15 +58,30 @@ type fhcItem struct { } // -// discovery.LegacyHealthCheck interface methods +// discovery.HealthCheck interface methods // // RegisterStats is not implemented. func (fhc *FakeHealthCheck) RegisterStats() { } -// WaitForInitialStatsUpdates is not implemented. -func (fhc *FakeHealthCheck) WaitForInitialStatsUpdates() { +// WaitForAllServingTablets is not implemented. +func (fhc *FakeHealthCheck) WaitForAllServingTablets(ctx context.Context, targets []*querypb.Target) error { + return nil +} + +// GetHealthyTabletStats is not implemented. +func (fhc *FakeHealthCheck) GetHealthyTabletStats(target *querypb.Target) []*TabletHealth { + return nil +} + +// Subscribe is not implemented. +func (fhc *FakeHealthCheck) Subscribe() chan *TabletHealth { + return nil +} + +// Unsubscribe is not implemented. +func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) { } // AddTablet adds the tablet. @@ -112,13 +131,14 @@ func (fhc *FakeHealthCheck) ReplaceTablet(old, new *topodatapb.Tablet) { } // TabletConnection returns the TabletConn of the given tablet. -func (fhc *FakeHealthCheck) TabletConnection(key string) queryservice.QueryService { +func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) { + key := topoproto.TabletAliasString(alias) fhc.mu.RLock() defer fhc.mu.RUnlock() if item := fhc.items[key]; item != nil { - return item.conn + return item.conn, nil } - return nil + return nil, vterrors.New(vtrpc.Code_NOT_FOUND, "tablet not found") } // CacheStatus returns the status for each tablet diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 1f9e08df99f..07dff320a42 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -71,8 +71,6 @@ var ( //TODO(deepthi): change these vars back to unexported when discoveryGateway is removed - // CellsToWatch is the list of cells the healthcheck operates over. If it is empty, only the local cell is watched - CellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets") // AllowedTabletTypes is the list of allowed tablet types. e.g. {MASTER, REPLICA} AllowedTabletTypes []topodata.TabletType // TabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets @@ -164,6 +162,43 @@ type TabletRecorder interface { type keyspaceShardTabletType string type tabletAliasString string +//HealthCheck declares what the TabletGateway needs from the HealthCheck +type HealthCheck interface { + // CacheStatus returns a displayable version of the health check cache. + CacheStatus() TabletsCacheStatusList + + // Close stops the healthcheck. + Close() error + + // WaitForAllServingTablets waits for at least one healthy serving tablet in + // each given target before returning. + // It will return ctx.Err() if the context is canceled. + // It will return an error if it can't read the necessary topology records. + WaitForAllServingTablets(ctx context.Context, targets []*query.Target) error + + // TabletConnection returns the TabletConn of the given tablet. + TabletConnection(alias *topodata.TabletAlias) (queryservice.QueryService, error) + + // RegisterStats registers the connection counts stats + RegisterStats() + + // GetHealthyTabletStats returns only the healthy tablets. + // The returned array is owned by the caller. + // For TabletType_MASTER, this will only return at most one entry, + // the most recent tablet of type master. + // This returns a copy of the data so that callers can access without + // synchronization + GetHealthyTabletStats(target *query.Target) []*TabletHealth + + // Subscribe adds a listener. Used by vtgate buffer to learn about master changes. + Subscribe() chan *TabletHealth + + // Unsubscribe removes a listener. + Unsubscribe(c chan *TabletHealth) +} + +var _ HealthCheck = (*HealthCheckImpl)(nil) + // HealthCheckImpl performs health checking and stores the results. // The goal of this object is to maintain a StreamHealth RPC // to a lot of tablets. Tablets are added / removed by calling the @@ -220,8 +255,8 @@ type HealthCheckImpl struct { // The localCell for this healthcheck // callback. // A function to call when there is a master change. Used to notify vtgate's buffer to stop buffering. -func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell string) *HealthCheckImpl { - log.Infof("loading tablets for cells: %v", *CellsToWatch) +func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl { + log.Infof("loading tablets for cells: %v", cellsToWatch) hc := &HealthCheckImpl{ ts: topoServer, @@ -236,7 +271,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } var topoWatchers []*TopologyWatcher var filter TabletFilter - cells := strings.Split(*CellsToWatch, ",") + cells := strings.Split(cellsToWatch, ",") if len(cells) == 0 { cells = append(cells, localCell) } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index c01df132f6b..88db7b9fecb 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -777,7 +777,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic } func createTestHc(ts *topo.Server) *HealthCheckImpl { - return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell") + return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "") } type fakeConn struct { diff --git a/go/vt/vtgate/api.go b/go/vt/vtgate/api.go index f6ed78cec00..2446968b85e 100644 --- a/go/vt/vtgate/api.go +++ b/go/vt/vtgate/api.go @@ -87,7 +87,7 @@ func getItemPath(url string) string { return parts[1] } -func initAPI(hc HealthCheck) { +func initAPI(hc discovery.HealthCheck) { // Healthcheck real time status per (cell, keyspace, tablet type, metric). handleCollection("health-check", func(r *http.Request) (interface{}, error) { cacheStatus := hc.CacheStatus() diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index 3fc31218f95..92e89cf9bd4 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -120,7 +120,7 @@ func NewDiscoveryGateway(ctx context.Context, hc discovery.LegacyHealthCheck, se // We set sendDownEvents=true because it's required by LegacyTabletStatsCache. hc.SetListener(dg, true /* sendDownEvents */) - cells := *discovery.CellsToWatch + cells := *CellsToWatch log.Infof("loading tablets for cells: %v", cells) for _, c := range strings.Split(cells, ",") { if c == "" { diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go new file mode 100644 index 00000000000..93f0164e9b8 --- /dev/null +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -0,0 +1,745 @@ +/* +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "fmt" + "reflect" + "strings" + "testing" + + "vitess.io/vitess/go/test/utils" + + "github.com/stretchr/testify/assert" + + "golang.org/x/net/context" + + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/key" + querypb "vitess.io/vitess/go/vt/proto/query" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/vterrors" +) + +// This file uses the sandbox_test framework. + +func TestLegacyExecuteFailOnAutocommit(t *testing.T) { + + createSandbox("TestExecuteFailOnAutocommit") + hc := discovery.NewFakeLegacyHealthCheck() + sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc0 := hc.AddTestTablet("aa", "0", 1, "TestExecuteFailOnAutocommit", "0", topodatapb.TabletType_MASTER, true, 1, nil) + sbc1 := hc.AddTestTablet("aa", "1", 1, "TestExecuteFailOnAutocommit", "1", topodatapb.TabletType_MASTER, true, 1, nil) + + rss := []*srvtopo.ResolvedShard{ + { + Target: &querypb.Target{ + Keyspace: "TestExecuteFailOnAutocommit", + Shard: "0", + TabletType: topodatapb.TabletType_MASTER, + }, + Gateway: sbc0, + }, + { + Target: &querypb.Target{ + Keyspace: "TestExecuteFailOnAutocommit", + Shard: "1", + TabletType: topodatapb.TabletType_MASTER, + }, + Gateway: sbc1, + }, + } + queries := []*querypb.BoundQuery{ + { + // This will fail to go to shard. It will be rejected at vtgate. + Sql: "query1", + BindVariables: map[string]*querypb.BindVariable{ + "bv0": sqltypes.Int64BindVariable(0), + }, + }, + { + // This will go to shard. + Sql: "query2", + BindVariables: map[string]*querypb.BindVariable{ + "bv1": sqltypes.Int64BindVariable(1), + }, + }, + } + // shard 0 - has transaction + // shard 1 - does not have transaction. + session := &vtgatepb.Session{ + InTransaction: true, + ShardSessions: []*vtgatepb.Session_ShardSession{ + { + Target: &querypb.Target{Keyspace: "TestExecuteFailOnAutocommit", Shard: "0", TabletType: topodatapb.TabletType_MASTER, Cell: "aa"}, + TransactionId: 123, + TabletAlias: nil, + }, + }, + Autocommit: false, + } + _, errs := sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(session), true /*autocommit*/) + err := vterrors.Aggregate(errs) + require.Error(t, err) + require.Contains(t, err.Error(), "in autocommit mode, transactionID should be zero but was: 123") + utils.MustMatch(t, 0, len(sbc0.Queries), "") + utils.MustMatch(t, []*querypb.BoundQuery{queries[1]}, sbc1.Queries, "") +} + +func TestScatterConnExecuteMulti(t *testing.T) { + testScatterConnGeneric(t, "TestScatterConnExecuteMultiShard", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + rss, err := res.ResolveDestination(ctx, "TestScatterConnExecuteMultiShard", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) + if err != nil { + return nil, err + } + + queries := make([]*querypb.BoundQuery, len(rss)) + for i := range rss { + queries[i] = &querypb.BoundQuery{ + Sql: "query", + BindVariables: nil, + } + } + + qr, errs := sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(nil), false /*autocommit*/) + return qr, vterrors.Aggregate(errs) + }) +} + +func TestScatterConnStreamExecute(t *testing.T) { + testScatterConnGeneric(t, "TestScatterConnStreamExecute", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecute", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) + if err != nil { + return nil, err + } + + qr := new(sqltypes.Result) + err = sc.StreamExecute(ctx, "query", nil, rss, nil, func(r *sqltypes.Result) error { + qr.AppendResult(r) + return nil + }) + return qr, err + }) +} + +func TestScatterConnStreamExecuteMulti(t *testing.T) { + testScatterConnGeneric(t, "TestScatterConnStreamExecuteMulti", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecuteMulti", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) + if err != nil { + return nil, err + } + bvs := make([]map[string]*querypb.BindVariable, len(rss)) + qr := new(sqltypes.Result) + err = sc.StreamExecuteMulti(ctx, "query", rss, bvs, nil, func(r *sqltypes.Result) error { + qr.AppendResult(r) + return nil + }) + return qr, err + }) +} + +// verifyScatterConnError checks that a returned error has the expected message, +// type, and error code. +func verifyScatterConnError(t *testing.T, err error, wantErr string, wantCode vtrpcpb.Code) { + t.Helper() + if err == nil || err.Error() != wantErr { + t.Errorf("wanted error: %s, got error: %v", wantErr, err) + } + if code := vterrors.Code(err); code != wantCode { + t.Errorf("wanted error code: %s, got: %v", wantCode, code) + } +} + +func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, shards []string) (*sqltypes.Result, error)) { + hc := discovery.NewFakeLegacyHealthCheck() + + // no shard + s := createSandbox(name) + sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + qr, err := f(sc, nil) + require.NoError(t, err) + if qr.RowsAffected != 0 { + t.Errorf("want 0, got %v", qr.RowsAffected) + } + + // single shard + s.Reset() + sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc := hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 + _, err = f(sc, []string{"0"}) + want := fmt.Sprintf("target: %v.0.replica, used tablet: aa-0 (0): INVALID_ARGUMENT error", name) + // Verify server error string. + if err == nil || err.Error() != want { + t.Errorf("want %s, got %v", want, err) + } + // Ensure that we tried only once. + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) + } + + // two shards + s.Reset() + hc.Reset() + sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc0 := hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 := hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 + sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 + _, err = f(sc, []string{"0", "1"}) + // Verify server errors are consolidated. + want = fmt.Sprintf("target: %v.0.replica, used tablet: aa-0 (0): INVALID_ARGUMENT error\ntarget: %v.1.replica, used tablet: aa-0 (1): INVALID_ARGUMENT error", name, name) + verifyScatterConnError(t, err, want, vtrpcpb.Code_INVALID_ARGUMENT) + // Ensure that we tried only once. + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) + } + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) + } + + // two shards with different errors + s.Reset() + hc.Reset() + sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 + sbc1.MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1 + _, err = f(sc, []string{"0", "1"}) + // Verify server errors are consolidated. + want = fmt.Sprintf("target: %v.0.replica, used tablet: aa-0 (0): INVALID_ARGUMENT error\ntarget: %v.1.replica, used tablet: aa-0 (1): RESOURCE_EXHAUSTED error", name, name) + // We should only surface the higher priority error code + verifyScatterConnError(t, err, want, vtrpcpb.Code_INVALID_ARGUMENT) + // Ensure that we tried only once. + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) + } + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) + } + + // duplicate shards + s.Reset() + hc.Reset() + sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + _, _ = f(sc, []string{"0", "0"}) + // Ensure that we executed only once. + if execCount := sbc.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) + } + + // no errors + s.Reset() + hc.Reset() + sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) + qr, err = f(sc, []string{"0", "1"}) + if err != nil { + t.Fatalf("want nil, got %v", err) + } + if execCount := sbc0.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) + } + if execCount := sbc1.ExecCount.Get(); execCount != 1 { + t.Errorf("want 1, got %v", execCount) + } + if qr.RowsAffected != 2 { + t.Errorf("want 2, got %v", qr.RowsAffected) + } + if len(qr.Rows) != 2 { + t.Errorf("want 2, got %v", len(qr.Rows)) + } +} + +func TestMaxMemoryRows(t *testing.T) { + save := *maxMemoryRows + *maxMemoryRows = 3 + defer func() { *maxMemoryRows = save }() + + createSandbox("TestMaxMemoryRows") + hc := discovery.NewFakeLegacyHealthCheck() + sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc0 := hc.AddTestTablet("aa", "0", 1, "TestMaxMemoryRows", "0", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 := hc.AddTestTablet("aa", "1", 1, "TestMaxMemoryRows", "1", topodatapb.TabletType_REPLICA, true, 1, nil) + + tworows := &sqltypes.Result{ + Rows: [][]sqltypes.Value{{ + sqltypes.NewInt64(1), + }, { + sqltypes.NewInt64(1), + }}, + RowsAffected: 1, + InsertID: 1, + } + sbc0.SetResults([]*sqltypes.Result{tworows, tworows}) + sbc1.SetResults([]*sqltypes.Result{tworows, tworows}) + + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + rss, _, err := res.ResolveDestinations(ctx, "TestMaxMemoryRows", topodatapb.TabletType_REPLICA, nil, + []key.Destination{key.DestinationShard("0"), key.DestinationShard("1")}) + require.NoError(t, err) + + session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) + + want := "in-memory row count exceeded allowed limit of 3" + queries := []*querypb.BoundQuery{{ + Sql: "query1", + BindVariables: map[string]*querypb.BindVariable{}, + }, { + Sql: "query1", + BindVariables: map[string]*querypb.BindVariable{}, + }} + _, errs := sc.ExecuteMultiShard(ctx, rss, queries, session, false) + assert.EqualError(t, errs[0], want) +} + +func TestReservedBeginTableDriven(t *testing.T) { + type testAction struct { + transaction, reserved bool + shards []string + sbc0Reserve, sbc1Reserve int64 + sbc0Begin, sbc1Begin int64 + } + type testCase struct { + name string + actions []testAction + } + + tests := []testCase{{ + name: "begin", + actions: []testAction{ + { + shards: []string{"0"}, + transaction: true, + sbc0Begin: 1, + }, { + shards: []string{"0", "1"}, + transaction: true, + sbc1Begin: 1, + }, { + shards: []string{"0", "1"}, + transaction: true, + // nothing needs to be done + }}, + }, { + name: "reserve", + actions: []testAction{ + { + shards: []string{"1"}, + reserved: true, + sbc1Reserve: 1, + }, { + shards: []string{"0", "1"}, + reserved: true, + sbc0Reserve: 1, + }, { + shards: []string{"0", "1"}, + reserved: true, + // nothing needs to be done + }}, + }, { + name: "reserve everywhere", + actions: []testAction{ + { + shards: []string{"0", "1"}, + reserved: true, + sbc0Reserve: 1, + sbc1Reserve: 1, + }}, + }, { + name: "begin then reserve", + actions: []testAction{ + { + shards: []string{"0"}, + transaction: true, + sbc0Begin: 1, + }, { + shards: []string{"0", "1"}, + transaction: true, + reserved: true, + sbc0Reserve: 1, + sbc1Reserve: 1, + sbc1Begin: 1, + }}, + }, { + name: "reserve then begin", + actions: []testAction{ + { + shards: []string{"1"}, + reserved: true, + sbc1Reserve: 1, + }, { + shards: []string{"0"}, + transaction: true, + reserved: true, + sbc0Reserve: 1, + sbc0Begin: 1, + }, { + shards: []string{"0", "1"}, + transaction: true, + reserved: true, + sbc1Begin: 1, + }}, + }, { + name: "reserveBegin", + actions: []testAction{ + { + shards: []string{"1"}, + transaction: true, + reserved: true, + sbc1Reserve: 1, + sbc1Begin: 1, + }, { + shards: []string{"0"}, + transaction: true, + reserved: true, + sbc0Reserve: 1, + sbc0Begin: 1, + }, { + shards: []string{"0", "1"}, + transaction: true, + reserved: true, + // nothing needs to be done + }}, + }, { + name: "reserveBegin everywhere", + actions: []testAction{ + { + shards: []string{"0", "1"}, + transaction: true, + reserved: true, + sbc0Reserve: 1, + sbc0Begin: 1, + sbc1Reserve: 1, + sbc1Begin: 1, + }}, + }} + for _, test := range tests { + keyspace := "keyspace" + createSandbox(keyspace) + hc := discovery.NewFakeLegacyHealthCheck() + sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc0 := hc.AddTestTablet("aa", "0", 1, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 := hc.AddTestTablet("aa", "1", 1, keyspace, "1", topodatapb.TabletType_REPLICA, true, 1, nil) + + // empty results + sbc0.SetResults([]*sqltypes.Result{{}}) + sbc1.SetResults([]*sqltypes.Result{{}}) + + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + + t.Run(test.name, func(t *testing.T) { + session := NewSafeSession(&vtgatepb.Session{}) + for _, action := range test.actions { + session.Session.InTransaction = action.transaction + session.Session.InReservedConn = action.reserved + var destinations []key.Destination + for _, shard := range action.shards { + destinations = append(destinations, key.DestinationShard(shard)) + } + executeOnShards(t, res, keyspace, sc, session, destinations) + assert.EqualValues(t, action.sbc0Reserve, sbc0.ReserveCount.Get(), "sbc0 reserve count") + assert.EqualValues(t, action.sbc0Begin, sbc0.BeginCount.Get(), "sbc0 begin count") + assert.EqualValues(t, action.sbc1Reserve, sbc1.ReserveCount.Get(), "sbc1 reserve count") + assert.EqualValues(t, action.sbc1Begin, sbc1.BeginCount.Get(), "sbc1 begin count") + sbc0.BeginCount.Set(0) + sbc0.ReserveCount.Set(0) + sbc1.BeginCount.Set(0) + sbc1.ReserveCount.Set(0) + } + }) + } +} + +// TODO (harshit): This test should actual fail. +func TestReservedOnMultiReplica(t *testing.T) { + keyspace := "keyspace" + createSandbox(keyspace) + hc := discovery.NewFakeLegacyHealthCheck() + sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc0_1 := hc.AddTestTablet("aa", "0", 1, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc0_2 := hc.AddTestTablet("aa", "2", 1, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) + // sbc1 := hc.AddTestTablet("aa", "1", 1, keyspace, "1", topodatapb.TabletType_REPLICA, true, 1, nil) + + // empty results + sbc0_1.SetResults([]*sqltypes.Result{{}}) + sbc0_2.SetResults([]*sqltypes.Result{{}}) + + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + + session := NewSafeSession(&vtgatepb.Session{InTransaction: false, InReservedConn: true}) + destinations := []key.Destination{key.DestinationShard("0")} + for i := 0; i < 10; i++ { + executeOnShards(t, res, keyspace, sc, session, destinations) + assert.EqualValues(t, 1, sbc0_1.ReserveCount.Get()+sbc0_2.ReserveCount.Get(), "sbc0 reserve count") + assert.EqualValues(t, 0, sbc0_1.BeginCount.Get()+sbc0_2.BeginCount.Get(), "sbc0 begin count") + } +} + +func executeOnShards(t *testing.T, res *srvtopo.Resolver, keyspace string, sc *ScatterConn, session *SafeSession, destinations []key.Destination) { + t.Helper() + rss, _, err := res.ResolveDestinations(ctx, keyspace, topodatapb.TabletType_REPLICA, nil, destinations) + require.NoError(t, err) + + var queries []*querypb.BoundQuery + + for range rss { + queries = append(queries, &querypb.BoundQuery{ + Sql: "query1", + BindVariables: map[string]*querypb.BindVariable{}, + }) + } + + _, errs := sc.ExecuteMultiShard(ctx, rss, queries, session, false) + require.Empty(t, errs) +} + +func TestMultiExecs(t *testing.T) { + createSandbox("TestMultiExecs") + hc := discovery.NewFakeLegacyHealthCheck() + sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + sbc0 := hc.AddTestTablet("aa", "0", 1, "TestMultiExecs", "0", topodatapb.TabletType_REPLICA, true, 1, nil) + sbc1 := hc.AddTestTablet("aa", "1", 1, "TestMultiExecs", "1", topodatapb.TabletType_REPLICA, true, 1, nil) + + rss := []*srvtopo.ResolvedShard{ + { + Target: &querypb.Target{ + Keyspace: "TestMultiExecs", + Shard: "0", + }, + Gateway: sbc0, + }, + { + Target: &querypb.Target{ + Keyspace: "TestMultiExecs", + Shard: "1", + }, + Gateway: sbc1, + }, + } + queries := []*querypb.BoundQuery{ + { + Sql: "query1", + BindVariables: map[string]*querypb.BindVariable{ + "bv0": sqltypes.Int64BindVariable(0), + }, + }, + { + Sql: "query2", + BindVariables: map[string]*querypb.BindVariable{ + "bv1": sqltypes.Int64BindVariable(1), + }, + }, + } + + _, _ = sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(nil), false) + if len(sbc0.Queries) == 0 || len(sbc1.Queries) == 0 { + t.Fatalf("didn't get expected query") + } + wantVars0 := map[string]*querypb.BindVariable{ + "bv0": queries[0].BindVariables["bv0"], + } + if !reflect.DeepEqual(sbc0.Queries[0].BindVariables, wantVars0) { + t.Errorf("got %v, want %v", sbc0.Queries[0].BindVariables, wantVars0) + } + wantVars1 := map[string]*querypb.BindVariable{ + "bv1": queries[1].BindVariables["bv1"], + } + if !reflect.DeepEqual(sbc1.Queries[0].BindVariables, wantVars1) { + t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars1) + } + sbc0.Queries = nil + sbc1.Queries = nil + + rss = []*srvtopo.ResolvedShard{ + { + Target: &querypb.Target{ + Keyspace: "TestMultiExecs", + Shard: "0", + }, + Gateway: sbc0, + }, + { + Target: &querypb.Target{ + Keyspace: "TestMultiExecs", + Shard: "1", + }, + Gateway: sbc1, + }, + } + bvs := []map[string]*querypb.BindVariable{ + { + "bv0": sqltypes.Int64BindVariable(0), + }, + { + "bv1": sqltypes.Int64BindVariable(1), + }, + } + _ = sc.StreamExecuteMulti(ctx, "query", rss, bvs, nil, func(*sqltypes.Result) error { + return nil + }) + if !reflect.DeepEqual(sbc0.Queries[0].BindVariables, wantVars0) { + t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars0) + } + if !reflect.DeepEqual(sbc1.Queries[0].BindVariables, wantVars1) { + t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars1) + } +} + +func TestScatterConnStreamExecuteSendError(t *testing.T) { + createSandbox("TestScatterConnStreamExecuteSendError") + hc := discovery.NewFakeLegacyHealthCheck() + sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + hc.AddTestTablet("aa", "0", 1, "TestScatterConnStreamExecuteSendError", "0", topodatapb.TabletType_REPLICA, true, 1, nil) + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecuteSendError", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) + if err != nil { + t.Fatalf("ResolveDestination failed: %v", err) + } + err = sc.StreamExecute(ctx, "query", nil, rss, nil, func(*sqltypes.Result) error { + return fmt.Errorf("send error") + }) + want := "send error" + // Ensure that we handle send errors. + if err == nil || !strings.Contains(err.Error(), want) { + t.Errorf("got %s, must contain %v", err, want) + } +} + +func TestScatterConnSingleDB(t *testing.T) { + createSandbox("TestScatterConnSingleDB") + hc := discovery.NewFakeLegacyHealthCheck() + + hc.Reset() + sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + hc.AddTestTablet("aa", "0", 1, "TestScatterConnSingleDB", "0", topodatapb.TabletType_MASTER, true, 1, nil) + hc.AddTestTablet("aa", "1", 1, "TestScatterConnSingleDB", "1", topodatapb.TabletType_MASTER, true, 1, nil) + + res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") + rss0, err := res.ResolveDestination(ctx, "TestScatterConnSingleDB", topodatapb.TabletType_MASTER, key.DestinationShard("0")) + require.NoError(t, err) + rss1, err := res.ResolveDestination(ctx, "TestScatterConnSingleDB", topodatapb.TabletType_MASTER, key.DestinationShard("1")) + require.NoError(t, err) + + want := "multi-db transaction attempted" + + // TransactionMode_SINGLE in session + session := NewSafeSession(&vtgatepb.Session{InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE}) + queries := []*querypb.BoundQuery{{Sql: "query1"}} + _, errors := sc.ExecuteMultiShard(ctx, rss0, queries, session, false) + require.Empty(t, errors) + _, errors = sc.ExecuteMultiShard(ctx, rss1, queries, session, false) + require.Error(t, errors[0]) + assert.Contains(t, errors[0].Error(), want) + + // TransactionMode_SINGLE in txconn + sc.txConn.mode = vtgatepb.TransactionMode_SINGLE + session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) + _, errors = sc.ExecuteMultiShard(ctx, rss0, queries, session, false) + require.Empty(t, errors) + _, errors = sc.ExecuteMultiShard(ctx, rss1, queries, session, false) + require.Error(t, errors[0]) + assert.Contains(t, errors[0].Error(), want) + + // TransactionMode_MULTI in txconn. Should not fail. + sc.txConn.mode = vtgatepb.TransactionMode_MULTI + session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) + _, errors = sc.ExecuteMultiShard(ctx, rss0, queries, session, false) + require.Empty(t, errors) + _, errors = sc.ExecuteMultiShard(ctx, rss1, queries, session, false) + require.Empty(t, errors) +} + +func TestAppendResult(t *testing.T) { + qr := new(sqltypes.Result) + innerqr1 := &sqltypes.Result{ + Fields: []*querypb.Field{}, + Rows: [][]sqltypes.Value{}, + } + innerqr2 := &sqltypes.Result{ + Fields: []*querypb.Field{ + {Name: "foo", Type: sqltypes.Int8}, + }, + RowsAffected: 1, + InsertID: 1, + Rows: [][]sqltypes.Value{ + {sqltypes.NewVarBinary("abcd")}, + }, + } + // test one empty result + qr.AppendResult(innerqr1) + qr.AppendResult(innerqr2) + if len(qr.Fields) != 1 { + t.Errorf("want 1, got %v", len(qr.Fields)) + } + if qr.RowsAffected != 1 { + t.Errorf("want 1, got %v", qr.RowsAffected) + } + if qr.InsertID != 1 { + t.Errorf("want 1, got %v", qr.InsertID) + } + if len(qr.Rows) != 1 { + t.Errorf("want 1, got %v", len(qr.Rows)) + } + // test two valid results + qr = new(sqltypes.Result) + qr.AppendResult(innerqr2) + qr.AppendResult(innerqr2) + if len(qr.Fields) != 1 { + t.Errorf("want 1, got %v", len(qr.Fields)) + } + if qr.RowsAffected != 2 { + t.Errorf("want 2, got %v", qr.RowsAffected) + } + if qr.InsertID != 1 { + t.Errorf("want 1, got %v", qr.InsertID) + } + if len(qr.Rows) != 2 { + t.Errorf("want 2, got %v", len(qr.Rows)) + } +} + +func newTestLegacyScatterConn(hc discovery.LegacyHealthCheck, serv srvtopo.Server, cell string) *ScatterConn { + // The topo.Server is used to start watching the cells described + // in '-cells_to_watch' command line parameter, which is + // empty by default. So it's unused in this test, set to nil. + gw := GatewayCreator()(ctx, hc, serv, cell, 3) + tc := NewTxConn(gw, vtgatepb.TransactionMode_TWOPC) + return NewLegacyScatterConn("", tc, gw, hc) +} + +func newTestScatterConn(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *ScatterConn { + // The topo.Server is used to start watching the cells described + // in '-cells_to_watch' command line parameter, which is + // empty by default. So it's unused in this test, set to nil. + gw := NewTabletGateway(ctx, hc, serv, cell) + tc := NewTxConn(gw, vtgatepb.TransactionMode_TWOPC) + return NewScatterConn("", tc, gw) +} + +var ctx = context.Background() diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 4ed65a573fd..f54c161a18e 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -17,25 +17,16 @@ limitations under the License. package vtgate import ( - "fmt" - "reflect" - "strings" "testing" "vitess.io/vitess/go/test/utils" - "github.com/stretchr/testify/assert" - - "golang.org/x/net/context" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/discovery" - "vitess.io/vitess/go/vt/key" querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/vterrors" ) @@ -45,8 +36,8 @@ import ( func TestExecuteFailOnAutocommit(t *testing.T) { createSandbox("TestExecuteFailOnAutocommit") - hc := discovery.NewFakeLegacyHealthCheck() - sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") + hc := discovery.NewFakeHealthCheck() + sc := newTestScatterConn(hc, new(sandboxTopo), "aa") sbc0 := hc.AddTestTablet("aa", "0", 1, "TestExecuteFailOnAutocommit", "0", topodatapb.TabletType_MASTER, true, 1, nil) sbc1 := hc.AddTestTablet("aa", "1", 1, "TestExecuteFailOnAutocommit", "1", topodatapb.TabletType_MASTER, true, 1, nil) @@ -104,633 +95,3 @@ func TestExecuteFailOnAutocommit(t *testing.T) { utils.MustMatch(t, 0, len(sbc0.Queries), "") utils.MustMatch(t, []*querypb.BoundQuery{queries[1]}, sbc1.Queries, "") } - -func TestScatterConnExecuteMulti(t *testing.T) { - testScatterConnGeneric(t, "TestScatterConnExecuteMultiShard", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { - res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(ctx, "TestScatterConnExecuteMultiShard", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) - if err != nil { - return nil, err - } - - queries := make([]*querypb.BoundQuery, len(rss)) - for i := range rss { - queries[i] = &querypb.BoundQuery{ - Sql: "query", - BindVariables: nil, - } - } - - qr, errs := sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(nil), false /*autocommit*/) - return qr, vterrors.Aggregate(errs) - }) -} - -func TestScatterConnStreamExecute(t *testing.T) { - testScatterConnGeneric(t, "TestScatterConnStreamExecute", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { - res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecute", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) - if err != nil { - return nil, err - } - - qr := new(sqltypes.Result) - err = sc.StreamExecute(ctx, "query", nil, rss, nil, func(r *sqltypes.Result) error { - qr.AppendResult(r) - return nil - }) - return qr, err - }) -} - -func TestScatterConnStreamExecuteMulti(t *testing.T) { - testScatterConnGeneric(t, "TestScatterConnStreamExecuteMulti", func(sc *ScatterConn, shards []string) (*sqltypes.Result, error) { - res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecuteMulti", topodatapb.TabletType_REPLICA, key.DestinationShards(shards)) - if err != nil { - return nil, err - } - bvs := make([]map[string]*querypb.BindVariable, len(rss)) - qr := new(sqltypes.Result) - err = sc.StreamExecuteMulti(ctx, "query", rss, bvs, nil, func(r *sqltypes.Result) error { - qr.AppendResult(r) - return nil - }) - return qr, err - }) -} - -// verifyScatterConnError checks that a returned error has the expected message, -// type, and error code. -func verifyScatterConnError(t *testing.T, err error, wantErr string, wantCode vtrpcpb.Code) { - t.Helper() - if err == nil || err.Error() != wantErr { - t.Errorf("wanted error: %s, got error: %v", wantErr, err) - } - if code := vterrors.Code(err); code != wantCode { - t.Errorf("wanted error code: %s, got: %v", wantCode, code) - } -} - -func testScatterConnGeneric(t *testing.T, name string, f func(sc *ScatterConn, shards []string) (*sqltypes.Result, error)) { - hc := discovery.NewFakeLegacyHealthCheck() - - // no shard - s := createSandbox(name) - sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - qr, err := f(sc, nil) - require.NoError(t, err) - if qr.RowsAffected != 0 { - t.Errorf("want 0, got %v", qr.RowsAffected) - } - - // single shard - s.Reset() - sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc := hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - _, err = f(sc, []string{"0"}) - want := fmt.Sprintf("target: %v.0.replica, used tablet: aa-0 (0): INVALID_ARGUMENT error", name) - // Verify server error string. - if err == nil || err.Error() != want { - t.Errorf("want %s, got %v", want, err) - } - // Ensure that we tried only once. - if execCount := sbc.ExecCount.Get(); execCount != 1 { - t.Errorf("want 1, got %v", execCount) - } - - // two shards - s.Reset() - hc.Reset() - sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc1 := hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - _, err = f(sc, []string{"0", "1"}) - // Verify server errors are consolidated. - want = fmt.Sprintf("target: %v.0.replica, used tablet: aa-0 (0): INVALID_ARGUMENT error\ntarget: %v.1.replica, used tablet: aa-0 (1): INVALID_ARGUMENT error", name, name) - verifyScatterConnError(t, err, want, vtrpcpb.Code_INVALID_ARGUMENT) - // Ensure that we tried only once. - if execCount := sbc0.ExecCount.Get(); execCount != 1 { - t.Errorf("want 1, got %v", execCount) - } - if execCount := sbc1.ExecCount.Get(); execCount != 1 { - t.Errorf("want 1, got %v", execCount) - } - - // two shards with different errors - s.Reset() - hc.Reset() - sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 - sbc1.MustFailCodes[vtrpcpb.Code_RESOURCE_EXHAUSTED] = 1 - _, err = f(sc, []string{"0", "1"}) - // Verify server errors are consolidated. - want = fmt.Sprintf("target: %v.0.replica, used tablet: aa-0 (0): INVALID_ARGUMENT error\ntarget: %v.1.replica, used tablet: aa-0 (1): RESOURCE_EXHAUSTED error", name, name) - // We should only surface the higher priority error code - verifyScatterConnError(t, err, want, vtrpcpb.Code_INVALID_ARGUMENT) - // Ensure that we tried only once. - if execCount := sbc0.ExecCount.Get(); execCount != 1 { - t.Errorf("want 1, got %v", execCount) - } - if execCount := sbc1.ExecCount.Get(); execCount != 1 { - t.Errorf("want 1, got %v", execCount) - } - - // duplicate shards - s.Reset() - hc.Reset() - sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - _, _ = f(sc, []string{"0", "0"}) - // Ensure that we executed only once. - if execCount := sbc.ExecCount.Get(); execCount != 1 { - t.Errorf("want 1, got %v", execCount) - } - - // no errors - s.Reset() - hc.Reset() - sc = newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc0 = hc.AddTestTablet("aa", "0", 1, name, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc1 = hc.AddTestTablet("aa", "1", 1, name, "1", topodatapb.TabletType_REPLICA, true, 1, nil) - qr, err = f(sc, []string{"0", "1"}) - if err != nil { - t.Fatalf("want nil, got %v", err) - } - if execCount := sbc0.ExecCount.Get(); execCount != 1 { - t.Errorf("want 1, got %v", execCount) - } - if execCount := sbc1.ExecCount.Get(); execCount != 1 { - t.Errorf("want 1, got %v", execCount) - } - if qr.RowsAffected != 2 { - t.Errorf("want 2, got %v", qr.RowsAffected) - } - if len(qr.Rows) != 2 { - t.Errorf("want 2, got %v", len(qr.Rows)) - } -} - -func TestMaxMemoryRows(t *testing.T) { - save := *maxMemoryRows - *maxMemoryRows = 3 - defer func() { *maxMemoryRows = save }() - - createSandbox("TestMaxMemoryRows") - hc := discovery.NewFakeLegacyHealthCheck() - sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "0", 1, "TestMaxMemoryRows", "0", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc1 := hc.AddTestTablet("aa", "1", 1, "TestMaxMemoryRows", "1", topodatapb.TabletType_REPLICA, true, 1, nil) - - tworows := &sqltypes.Result{ - Rows: [][]sqltypes.Value{{ - sqltypes.NewInt64(1), - }, { - sqltypes.NewInt64(1), - }}, - RowsAffected: 1, - InsertID: 1, - } - sbc0.SetResults([]*sqltypes.Result{tworows, tworows}) - sbc1.SetResults([]*sqltypes.Result{tworows, tworows}) - - res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, _, err := res.ResolveDestinations(ctx, "TestMaxMemoryRows", topodatapb.TabletType_REPLICA, nil, - []key.Destination{key.DestinationShard("0"), key.DestinationShard("1")}) - require.NoError(t, err) - - session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - - want := "in-memory row count exceeded allowed limit of 3" - queries := []*querypb.BoundQuery{{ - Sql: "query1", - BindVariables: map[string]*querypb.BindVariable{}, - }, { - Sql: "query1", - BindVariables: map[string]*querypb.BindVariable{}, - }} - _, errs := sc.ExecuteMultiShard(ctx, rss, queries, session, false) - assert.EqualError(t, errs[0], want) -} - -func TestReservedBeginTableDriven(t *testing.T) { - type testAction struct { - transaction, reserved bool - shards []string - sbc0Reserve, sbc1Reserve int64 - sbc0Begin, sbc1Begin int64 - } - type testCase struct { - name string - actions []testAction - } - - tests := []testCase{{ - name: "begin", - actions: []testAction{ - { - shards: []string{"0"}, - transaction: true, - sbc0Begin: 1, - }, { - shards: []string{"0", "1"}, - transaction: true, - sbc1Begin: 1, - }, { - shards: []string{"0", "1"}, - transaction: true, - // nothing needs to be done - }}, - }, { - name: "reserve", - actions: []testAction{ - { - shards: []string{"1"}, - reserved: true, - sbc1Reserve: 1, - }, { - shards: []string{"0", "1"}, - reserved: true, - sbc0Reserve: 1, - }, { - shards: []string{"0", "1"}, - reserved: true, - // nothing needs to be done - }}, - }, { - name: "reserve everywhere", - actions: []testAction{ - { - shards: []string{"0", "1"}, - reserved: true, - sbc0Reserve: 1, - sbc1Reserve: 1, - }}, - }, { - name: "begin then reserve", - actions: []testAction{ - { - shards: []string{"0"}, - transaction: true, - sbc0Begin: 1, - }, { - shards: []string{"0", "1"}, - transaction: true, - reserved: true, - sbc0Reserve: 1, - sbc1Reserve: 1, - sbc1Begin: 1, - }}, - }, { - name: "reserve then begin", - actions: []testAction{ - { - shards: []string{"1"}, - reserved: true, - sbc1Reserve: 1, - }, { - shards: []string{"0"}, - transaction: true, - reserved: true, - sbc0Reserve: 1, - sbc0Begin: 1, - }, { - shards: []string{"0", "1"}, - transaction: true, - reserved: true, - sbc1Begin: 1, - }}, - }, { - name: "reserveBegin", - actions: []testAction{ - { - shards: []string{"1"}, - transaction: true, - reserved: true, - sbc1Reserve: 1, - sbc1Begin: 1, - }, { - shards: []string{"0"}, - transaction: true, - reserved: true, - sbc0Reserve: 1, - sbc0Begin: 1, - }, { - shards: []string{"0", "1"}, - transaction: true, - reserved: true, - // nothing needs to be done - }}, - }, { - name: "reserveBegin everywhere", - actions: []testAction{ - { - shards: []string{"0", "1"}, - transaction: true, - reserved: true, - sbc0Reserve: 1, - sbc0Begin: 1, - sbc1Reserve: 1, - sbc1Begin: 1, - }}, - }} - for _, test := range tests { - keyspace := "keyspace" - createSandbox(keyspace) - hc := discovery.NewFakeLegacyHealthCheck() - sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "0", 1, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc1 := hc.AddTestTablet("aa", "1", 1, keyspace, "1", topodatapb.TabletType_REPLICA, true, 1, nil) - - // empty results - sbc0.SetResults([]*sqltypes.Result{{}}) - sbc1.SetResults([]*sqltypes.Result{{}}) - - res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - - t.Run(test.name, func(t *testing.T) { - session := NewSafeSession(&vtgatepb.Session{}) - for _, action := range test.actions { - session.Session.InTransaction = action.transaction - session.Session.InReservedConn = action.reserved - var destinations []key.Destination - for _, shard := range action.shards { - destinations = append(destinations, key.DestinationShard(shard)) - } - executeOnShards(t, res, keyspace, sc, session, destinations) - assert.EqualValues(t, action.sbc0Reserve, sbc0.ReserveCount.Get(), "sbc0 reserve count") - assert.EqualValues(t, action.sbc0Begin, sbc0.BeginCount.Get(), "sbc0 begin count") - assert.EqualValues(t, action.sbc1Reserve, sbc1.ReserveCount.Get(), "sbc1 reserve count") - assert.EqualValues(t, action.sbc1Begin, sbc1.BeginCount.Get(), "sbc1 begin count") - sbc0.BeginCount.Set(0) - sbc0.ReserveCount.Set(0) - sbc1.BeginCount.Set(0) - sbc1.ReserveCount.Set(0) - } - }) - } -} - -// TODO (harshit): This test should actual fail. -func TestReservedOnMultiReplica(t *testing.T) { - keyspace := "keyspace" - createSandbox(keyspace) - hc := discovery.NewFakeLegacyHealthCheck() - sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc0_1 := hc.AddTestTablet("aa", "0", 1, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc0_2 := hc.AddTestTablet("aa", "2", 1, keyspace, "0", topodatapb.TabletType_REPLICA, true, 1, nil) - // sbc1 := hc.AddTestTablet("aa", "1", 1, keyspace, "1", topodatapb.TabletType_REPLICA, true, 1, nil) - - // empty results - sbc0_1.SetResults([]*sqltypes.Result{{}}) - sbc0_2.SetResults([]*sqltypes.Result{{}}) - - res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - - session := NewSafeSession(&vtgatepb.Session{InTransaction: false, InReservedConn: true}) - destinations := []key.Destination{key.DestinationShard("0")} - for i := 0; i < 10; i++ { - executeOnShards(t, res, keyspace, sc, session, destinations) - assert.EqualValues(t, 1, sbc0_1.ReserveCount.Get()+sbc0_2.ReserveCount.Get(), "sbc0 reserve count") - assert.EqualValues(t, 0, sbc0_1.BeginCount.Get()+sbc0_2.BeginCount.Get(), "sbc0 begin count") - } -} - -func executeOnShards(t *testing.T, res *srvtopo.Resolver, keyspace string, sc *ScatterConn, session *SafeSession, destinations []key.Destination) { - t.Helper() - rss, _, err := res.ResolveDestinations(ctx, keyspace, topodatapb.TabletType_REPLICA, nil, destinations) - require.NoError(t, err) - - var queries []*querypb.BoundQuery - - for range rss { - queries = append(queries, &querypb.BoundQuery{ - Sql: "query1", - BindVariables: map[string]*querypb.BindVariable{}, - }) - } - - _, errs := sc.ExecuteMultiShard(ctx, rss, queries, session, false) - require.Empty(t, errs) -} - -func TestMultiExecs(t *testing.T) { - createSandbox("TestMultiExecs") - hc := discovery.NewFakeLegacyHealthCheck() - sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - sbc0 := hc.AddTestTablet("aa", "0", 1, "TestMultiExecs", "0", topodatapb.TabletType_REPLICA, true, 1, nil) - sbc1 := hc.AddTestTablet("aa", "1", 1, "TestMultiExecs", "1", topodatapb.TabletType_REPLICA, true, 1, nil) - - rss := []*srvtopo.ResolvedShard{ - { - Target: &querypb.Target{ - Keyspace: "TestMultiExecs", - Shard: "0", - }, - Gateway: sbc0, - }, - { - Target: &querypb.Target{ - Keyspace: "TestMultiExecs", - Shard: "1", - }, - Gateway: sbc1, - }, - } - queries := []*querypb.BoundQuery{ - { - Sql: "query1", - BindVariables: map[string]*querypb.BindVariable{ - "bv0": sqltypes.Int64BindVariable(0), - }, - }, - { - Sql: "query2", - BindVariables: map[string]*querypb.BindVariable{ - "bv1": sqltypes.Int64BindVariable(1), - }, - }, - } - - _, _ = sc.ExecuteMultiShard(ctx, rss, queries, NewSafeSession(nil), false) - if len(sbc0.Queries) == 0 || len(sbc1.Queries) == 0 { - t.Fatalf("didn't get expected query") - } - wantVars0 := map[string]*querypb.BindVariable{ - "bv0": queries[0].BindVariables["bv0"], - } - if !reflect.DeepEqual(sbc0.Queries[0].BindVariables, wantVars0) { - t.Errorf("got %v, want %v", sbc0.Queries[0].BindVariables, wantVars0) - } - wantVars1 := map[string]*querypb.BindVariable{ - "bv1": queries[1].BindVariables["bv1"], - } - if !reflect.DeepEqual(sbc1.Queries[0].BindVariables, wantVars1) { - t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars1) - } - sbc0.Queries = nil - sbc1.Queries = nil - - rss = []*srvtopo.ResolvedShard{ - { - Target: &querypb.Target{ - Keyspace: "TestMultiExecs", - Shard: "0", - }, - Gateway: sbc0, - }, - { - Target: &querypb.Target{ - Keyspace: "TestMultiExecs", - Shard: "1", - }, - Gateway: sbc1, - }, - } - bvs := []map[string]*querypb.BindVariable{ - { - "bv0": sqltypes.Int64BindVariable(0), - }, - { - "bv1": sqltypes.Int64BindVariable(1), - }, - } - _ = sc.StreamExecuteMulti(ctx, "query", rss, bvs, nil, func(*sqltypes.Result) error { - return nil - }) - if !reflect.DeepEqual(sbc0.Queries[0].BindVariables, wantVars0) { - t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars0) - } - if !reflect.DeepEqual(sbc1.Queries[0].BindVariables, wantVars1) { - t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars1) - } -} - -func TestScatterConnStreamExecuteSendError(t *testing.T) { - createSandbox("TestScatterConnStreamExecuteSendError") - hc := discovery.NewFakeLegacyHealthCheck() - sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - hc.AddTestTablet("aa", "0", 1, "TestScatterConnStreamExecuteSendError", "0", topodatapb.TabletType_REPLICA, true, 1, nil) - res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss, err := res.ResolveDestination(ctx, "TestScatterConnStreamExecuteSendError", topodatapb.TabletType_REPLICA, key.DestinationShard("0")) - if err != nil { - t.Fatalf("ResolveDestination failed: %v", err) - } - err = sc.StreamExecute(ctx, "query", nil, rss, nil, func(*sqltypes.Result) error { - return fmt.Errorf("send error") - }) - want := "send error" - // Ensure that we handle send errors. - if err == nil || !strings.Contains(err.Error(), want) { - t.Errorf("got %s, must contain %v", err, want) - } -} - -func TestScatterConnSingleDB(t *testing.T) { - createSandbox("TestScatterConnSingleDB") - hc := discovery.NewFakeLegacyHealthCheck() - - hc.Reset() - sc := newTestLegacyScatterConn(hc, new(sandboxTopo), "aa") - hc.AddTestTablet("aa", "0", 1, "TestScatterConnSingleDB", "0", topodatapb.TabletType_MASTER, true, 1, nil) - hc.AddTestTablet("aa", "1", 1, "TestScatterConnSingleDB", "1", topodatapb.TabletType_MASTER, true, 1, nil) - - res := srvtopo.NewResolver(&sandboxTopo{}, sc.gateway, "aa") - rss0, err := res.ResolveDestination(ctx, "TestScatterConnSingleDB", topodatapb.TabletType_MASTER, key.DestinationShard("0")) - require.NoError(t, err) - rss1, err := res.ResolveDestination(ctx, "TestScatterConnSingleDB", topodatapb.TabletType_MASTER, key.DestinationShard("1")) - require.NoError(t, err) - - want := "multi-db transaction attempted" - - // TransactionMode_SINGLE in session - session := NewSafeSession(&vtgatepb.Session{InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE}) - queries := []*querypb.BoundQuery{{Sql: "query1"}} - _, errors := sc.ExecuteMultiShard(ctx, rss0, queries, session, false) - require.Empty(t, errors) - _, errors = sc.ExecuteMultiShard(ctx, rss1, queries, session, false) - require.Error(t, errors[0]) - assert.Contains(t, errors[0].Error(), want) - - // TransactionMode_SINGLE in txconn - sc.txConn.mode = vtgatepb.TransactionMode_SINGLE - session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) - _, errors = sc.ExecuteMultiShard(ctx, rss0, queries, session, false) - require.Empty(t, errors) - _, errors = sc.ExecuteMultiShard(ctx, rss1, queries, session, false) - require.Error(t, errors[0]) - assert.Contains(t, errors[0].Error(), want) - - // TransactionMode_MULTI in txconn. Should not fail. - sc.txConn.mode = vtgatepb.TransactionMode_MULTI - session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) - _, errors = sc.ExecuteMultiShard(ctx, rss0, queries, session, false) - require.Empty(t, errors) - _, errors = sc.ExecuteMultiShard(ctx, rss1, queries, session, false) - require.Empty(t, errors) -} - -func TestAppendResult(t *testing.T) { - qr := new(sqltypes.Result) - innerqr1 := &sqltypes.Result{ - Fields: []*querypb.Field{}, - Rows: [][]sqltypes.Value{}, - } - innerqr2 := &sqltypes.Result{ - Fields: []*querypb.Field{ - {Name: "foo", Type: sqltypes.Int8}, - }, - RowsAffected: 1, - InsertID: 1, - Rows: [][]sqltypes.Value{ - {sqltypes.NewVarBinary("abcd")}, - }, - } - // test one empty result - qr.AppendResult(innerqr1) - qr.AppendResult(innerqr2) - if len(qr.Fields) != 1 { - t.Errorf("want 1, got %v", len(qr.Fields)) - } - if qr.RowsAffected != 1 { - t.Errorf("want 1, got %v", qr.RowsAffected) - } - if qr.InsertID != 1 { - t.Errorf("want 1, got %v", qr.InsertID) - } - if len(qr.Rows) != 1 { - t.Errorf("want 1, got %v", len(qr.Rows)) - } - // test two valid results - qr = new(sqltypes.Result) - qr.AppendResult(innerqr2) - qr.AppendResult(innerqr2) - if len(qr.Fields) != 1 { - t.Errorf("want 1, got %v", len(qr.Fields)) - } - if qr.RowsAffected != 2 { - t.Errorf("want 2, got %v", qr.RowsAffected) - } - if qr.InsertID != 1 { - t.Errorf("want 1, got %v", qr.InsertID) - } - if len(qr.Rows) != 2 { - t.Errorf("want 2, got %v", len(qr.Rows)) - } -} - -func newTestLegacyScatterConn(hc discovery.LegacyHealthCheck, serv srvtopo.Server, cell string) *ScatterConn { - // The topo.Server is used to start watching the cells described - // in '-cells_to_watch' command line parameter, which is - // empty by default. So it's unused in this test, set to nil. - gw := GatewayCreator()(ctx, hc, serv, cell, 3) - tc := NewTxConn(gw, vtgatepb.TransactionMode_TWOPC) - return NewLegacyScatterConn("", tc, gw, hc) -} - -var ctx = context.Background() diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 68255e16add..de03ae079d0 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -17,6 +17,8 @@ limitations under the License. package vtgate import ( + "context" + "flag" "fmt" "math/rand" "sort" @@ -25,8 +27,6 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" - "golang.org/x/net/context" - "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/srvtopo" @@ -48,42 +48,17 @@ func init() { RegisterGatewayCreator(tabletGatewayImplementation, createTabletGateway) } -//HealthCheck declares what the TabletGateway needs from the HealthCheck -type HealthCheck interface { - // CacheStatus returns a displayable version of the health check cache. - CacheStatus() discovery.TabletsCacheStatusList - - // Close stops the healthcheck. - Close() error - - // WaitForAllServingTablets waits for at least one healthy serving tablet in - // each given target before returning. - // It will return ctx.Err() if the context is canceled. - // It will return an error if it can't read the necessary topology records. - WaitForAllServingTablets(ctx context.Context, targets []*querypb.Target) error - - // TabletConnection returns the TabletConn of the given tablet. - TabletConnection(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) - - // RegisterStats registers the connection counts stats - RegisterStats() - - // GetHealthyTabletStats returns only the healthy tablets. - // The returned array is owned by the caller. - // For TabletType_MASTER, this will only return at most one entry, - // the most recent tablet of type master. - // This returns a copy of the data so that callers can access without - // synchronization - GetHealthyTabletStats(target *querypb.Target) []*discovery.TabletHealth -} - -var _ HealthCheck = (*discovery.HealthCheckImpl)(nil) +var _ discovery.HealthCheck = (*discovery.HealthCheckImpl)(nil) +var ( + // CellsToWatch is the list of cells the healthcheck operates over. If it is empty, only the local cell is watched + CellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets") +) // TabletGateway implements the Gateway interface. // This implementation uses the new healthcheck module. type TabletGateway struct { queryservice.QueryService - hc HealthCheck + hc discovery.HealthCheck srvTopoServer srvtopo.Server localCell string retryCount int @@ -99,11 +74,16 @@ type TabletGateway struct { } func createTabletGateway(ctx context.Context, _ discovery.LegacyHealthCheck, serv srvtopo.Server, cell string, _ int) Gateway { - return NewTabletGateway(ctx, serv, cell) + // we ignore the passed in LegacyHealthCheck and let TabletGateway create it's own HealthCheck + return NewTabletGateway(ctx, nil /*discovery.Healthcheck*/, serv, cell) +} + +func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { + return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch) } // NewTabletGateway creates and returns a new TabletGateway -func NewTabletGateway(ctx context.Context, serv srvtopo.Server, localCell string) *TabletGateway { +func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, localCell string) *TabletGateway { var topoServer *topo.Server if serv != nil { var err error @@ -112,7 +92,9 @@ func NewTabletGateway(ctx context.Context, serv srvtopo.Server, localCell string log.Exitf("Unable to create new TabletGateway: %v", err) } } - hc := discovery.NewHealthCheck(ctx, *HealthCheckRetryDelay, *HealthCheckTimeout, topoServer, localCell) + if hc == nil { + hc = createHealthCheck(ctx, *HealthCheckRetryDelay, *HealthCheckTimeout, topoServer, localCell, *CellsToWatch) + } gw := &TabletGateway{ hc: hc, diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 781248f4874..33c0e0a1e6a 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -142,7 +142,8 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa // Build objects from low to high level. // Start with the gateway. If we can't reach the topology service, // we can't go on much further, so we log.Fatal out. - gw := NewTabletGateway(ctx, serv, cell) + // TabletGateway can create it's own healthcheck + gw := NewTabletGateway(ctx, nil /*discovery.Healthcheck*/, serv, cell) gw.RegisterStats() if err := WaitForTablets(gw, tabletTypesToWait); err != nil { log.Fatalf("gateway.WaitForTablets failed: %v", err)