From c6d08bf3df049dc27877cb4fcd13bc9109ca6c4c Mon Sep 17 00:00:00 2001 From: deepthi Date: Mon, 30 Mar 2020 18:50:46 -0700 Subject: [PATCH 1/2] resurrect gateway interface and command line option Signed-off-by: deepthi --- go/vt/vtexplain/vtexplain_vtgate.go | 2 +- .../{tabletgateway.go => discoverygateway.go} | 73 ++++------- ...teway_test.go => discoverygateway_test.go} | 37 +++--- go/vt/vtgate/gateway.go | 119 ++++++++++++++++++ go/vt/vtgate/gateway_test_suite.go | 11 +- go/vt/vtgate/grpc_discovery_test.go | 5 +- go/vt/vtgate/scatter_conn.go | 4 +- go/vt/vtgate/scatter_conn_test.go | 2 +- go/vt/vtgate/status_test.go | 1 + go/vt/vtgate/tx_conn.go | 4 +- go/vt/vtgate/vstream_manager_test.go | 2 +- go/vt/vtgate/vtgate.go | 6 +- go/vt/vttablet/queryservice/wrapped.go | 2 +- 13 files changed, 184 insertions(+), 84 deletions(-) rename go/vt/vtgate/{tabletgateway.go => discoverygateway.go} (80%) rename go/vt/vtgate/{tabletgateway_test.go => discoverygateway_test.go} (90%) create mode 100644 go/vt/vtgate/gateway.go diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 9cedd20807a..97098b0db34 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -72,7 +72,7 @@ func initVtgateExecutor(vSchemaStr string, opts *Options) error { func newFakeResolver(opts *Options, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vtgate.Resolver { ctx := context.Background() - gw := vtgate.NewTabletGateway(ctx, hc, serv, cell, 3) + gw := vtgate.GetGatewayCreator()(ctx, hc, serv, cell, 3) gw.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) txMode := vtgatepb.TransactionMode_MULTI diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/discoverygateway.go similarity index 80% rename from go/vt/vtgate/tabletgateway.go rename to go/vt/vtgate/discoverygateway.go index 1e7db1a6f33..474ec99d35f 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -19,12 +19,13 @@ package vtgate import ( "flag" "fmt" - "golang.org/x/net/context" "math/rand" "sort" "strings" "sync" "time" + + "golang.org/x/net/context" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/flagutil" @@ -44,11 +45,6 @@ import ( ) var ( - _ = flag.String("gateway_implementation", "", "Deprecated") - initialTabletTimeout = flag.Duration("gateway_initial_tablet_timeout", 30*time.Second, "At startup, the gateway will wait up to that duration to get one tablet per keyspace/shard/tablettype") - // KeyspacesToWatch - if provided this specifies which keyspaces should be - // visible to a vtgate. By default the vtgate will allow access to any - // keyspace. cellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets") refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval") refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") @@ -56,17 +52,20 @@ var ( allowedTabletTypes []topodatapb.TabletType - tabletFilters flagutil.StringListValue - KeyspacesToWatch flagutil.StringListValue + tabletFilters flagutil.StringListValue +) + +const ( + gatewayImplementationDiscovery = "discoverygateway" ) func init() { - flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema") flag.Var(&tabletFilters, "tablet_filters", "Specifies a comma-separated list of 'keyspace|shard_name or keyrange' values to filter the tablets to watch") topoproto.TabletTypeListVar(&allowedTabletTypes, "allowed_tablet_types", "Specifies the tablet types this vtgate is allowed to route queries to") + RegisterGatewayCreator(gatewayImplementationDiscovery, createDiscoveryGateway) } -type tabletGateway struct { +type discoveryGateway struct { queryservice.QueryService hc discovery.HealthCheck tsc *discovery.TabletStatsCache @@ -88,7 +87,11 @@ type tabletGateway struct { buffer *buffer.Buffer } -func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string, retryCount int) *tabletGateway { +func createDiscoveryGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string, retryCount int) Gateway { + return NewDiscoveryGateway(ctx, hc, serv, cell, retryCount) +} + +func NewDiscoveryGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string, retryCount int) *discoveryGateway { var topoServer *topo.Server if serv != nil { var err error @@ -98,7 +101,7 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop } } - dg := &tabletGateway{ + dg := &discoveryGateway{ hc: hc, tsc: discovery.NewTabletStatsCacheDoNotSetListener(topoServer, cell), srvTopoServer: serv, @@ -142,7 +145,7 @@ func NewTabletGateway(ctx context.Context, hc discovery.HealthCheck, serv srvtop // RegisterStats registers the stats to export the lag since the last refresh // and the checksum of the topology -func (dg *tabletGateway) RegisterStats() { +func (dg *discoveryGateway) RegisterStats() { stats.NewGaugeDurationFunc( "TopologyWatcherMaxRefreshLag", "maximum time since the topology watcher refreshed a cell", @@ -158,7 +161,7 @@ func (dg *tabletGateway) RegisterStats() { // topologyWatcherMaxRefreshLag returns the maximum lag since the watched // cells were refreshed from the topo server -func (dg *tabletGateway) topologyWatcherMaxRefreshLag() time.Duration { +func (dg *discoveryGateway) topologyWatcherMaxRefreshLag() time.Duration { var lag time.Duration for _, tw := range dg.tabletsWatchers { cellLag := tw.RefreshLag() @@ -170,7 +173,7 @@ func (dg *tabletGateway) topologyWatcherMaxRefreshLag() time.Duration { } // topologyWatcherChecksum returns a checksum of the topology watcher state -func (dg *tabletGateway) topologyWatcherChecksum() int64 { +func (dg *discoveryGateway) topologyWatcherChecksum() int64 { var checksum int64 for _, tw := range dg.tabletsWatchers { checksum = checksum ^ int64(tw.TopoChecksum()) @@ -180,7 +183,7 @@ func (dg *tabletGateway) topologyWatcherChecksum() int64 { // StatsUpdate forwards HealthCheck updates to TabletStatsCache and MasterBuffer. // It is part of the discovery.HealthCheckStatsListener interface. -func (dg *tabletGateway) StatsUpdate(ts *discovery.TabletStats) { +func (dg *discoveryGateway) StatsUpdate(ts *discovery.TabletStats) { dg.tsc.StatsUpdate(ts) if ts.Target.TabletType == topodatapb.TabletType_MASTER { @@ -189,7 +192,7 @@ func (dg *tabletGateway) StatsUpdate(ts *discovery.TabletStats) { } // WaitForTablets is part of the gateway.Gateway interface. -func (dg *tabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait []topodatapb.TabletType) error { +func (dg *discoveryGateway) WaitForTablets(ctx context.Context, tabletTypesToWait []topodatapb.TabletType) error { // Skip waiting for tablets if we are not told to do so. if len(tabletTypesToWait) == 0 { return nil @@ -206,7 +209,7 @@ func (dg *tabletGateway) WaitForTablets(ctx context.Context, tabletTypesToWait [ // Close shuts down underlying connections. // This function hides the inner implementation. -func (dg *tabletGateway) Close(ctx context.Context) error { +func (dg *discoveryGateway) Close(ctx context.Context) error { dg.buffer.Shutdown() for _, ctw := range dg.tabletsWatchers { ctw.Stop() @@ -216,7 +219,7 @@ func (dg *tabletGateway) Close(ctx context.Context) error { // CacheStatus returns a list of TabletCacheStatus per // keyspace/shard/tablet_type. -func (dg *tabletGateway) CacheStatus() TabletCacheStatusList { +func (dg *discoveryGateway) CacheStatus() TabletCacheStatusList { dg.mu.RLock() res := make(TabletCacheStatusList, 0, len(dg.statusAggregators)) for _, aggr := range dg.statusAggregators { @@ -232,7 +235,7 @@ func (dg *tabletGateway) CacheStatus() TabletCacheStatusList { // the middle of a transaction. While returning the error check if it maybe a result of // a resharding event, and set the re-resolve bit and let the upper layers // re-resolve and retry. -func (dg *tabletGateway) withRetry(ctx context.Context, target *querypb.Target, unused queryservice.QueryService, name string, inTransaction bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error { +func (dg *discoveryGateway) withRetry(ctx context.Context, target *querypb.Target, unused queryservice.QueryService, name string, inTransaction bool, inner func(ctx context.Context, target *querypb.Target, conn queryservice.QueryService) (bool, error)) error { var tabletLastUsed *topodatapb.Tablet var err error invalidTablets := make(map[string]bool) @@ -371,13 +374,13 @@ func nextTablet(cell string, tablets []discovery.TabletStats, offset, length int return -1 } -func (dg *tabletGateway) updateStats(target *querypb.Target, startTime time.Time, err error) { +func (dg *discoveryGateway) updateStats(target *querypb.Target, startTime time.Time, err error) { elapsed := time.Since(startTime) aggr := dg.getStatsAggregator(target) aggr.UpdateQueryInfo("", target.TabletType, elapsed, err != nil) } -func (dg *tabletGateway) getStatsAggregator(target *querypb.Target) *TabletStatusAggregator { +func (dg *discoveryGateway) getStatsAggregator(target *querypb.Target) *TabletStatusAggregator { key := fmt.Sprintf("%v/%v/%v", target.Keyspace, target.Shard, target.TabletType.String()) // get existing aggregator @@ -399,32 +402,6 @@ func (dg *tabletGateway) getStatsAggregator(target *querypb.Target) *TabletStatu return aggr } -// WaitForTablets is a helper method to wait for the provided tablets, -// up until the *initialTabletTimeout. It will log what it is doing. -// Note it has the same name as the tabletGateway's method, as it -// just calls it. -func WaitForTablets(gw *tabletGateway, tabletTypesToWait []topodatapb.TabletType) error { - log.Infof("Gateway waiting for serving tablets of types %v ...", tabletTypesToWait) - ctx, cancel := context.WithTimeout(context.Background(), *initialTabletTimeout) - defer cancel() - - err := gw.WaitForTablets(ctx, tabletTypesToWait) - switch err { - case nil: - // Log so we know everything is fine. - log.Infof("Waiting for tablets completed") - case context.DeadlineExceeded: - // In this scenario, we were able to reach the - // topology service, but some tablets may not be - // ready. We just warn and keep going. - log.Warningf("Timeout waiting for all keyspaces / shards to have healthy tablets of types %v, may be in degraded mode", tabletTypesToWait) - err = nil - default: - // Nothing to do here, the caller will log.Fatalf. - } - return err -} - // NewShardError returns a new error with the shard info amended. func NewShardError(in error, target *querypb.Target, tablet *topodatapb.Tablet) error { if in == nil { diff --git a/go/vt/vtgate/tabletgateway_test.go b/go/vt/vtgate/discoverygateway_test.go similarity index 90% rename from go/vt/vtgate/tabletgateway_test.go rename to go/vt/vtgate/discoverygateway_test.go index ed231fbbee9..ce7508f96e1 100644 --- a/go/vt/vtgate/tabletgateway_test.go +++ b/go/vt/vtgate/discoverygateway_test.go @@ -18,10 +18,11 @@ package vtgate import ( "fmt" - "golang.org/x/net/context" "strings" "testing" + "golang.org/x/net/context" + "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/discovery" "vitess.io/vitess/go/vt/srvtopo/srvtopotest" @@ -36,23 +37,23 @@ import ( ) func TestDiscoveryGatewayExecute(t *testing.T) { - testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error { _, err := dg.Execute(context.Background(), target, "query", nil, 0, nil) return err }) - testDiscoveryGatewayTransact(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayTransact(t, func(dg *discoveryGateway, target *querypb.Target) error { _, err := dg.Execute(context.Background(), target, "query", nil, 1, nil) return err }) } func TestDiscoveryGatewayExecuteBatch(t *testing.T) { - testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error { queries := []*querypb.BoundQuery{{Sql: "query", BindVariables: nil}} _, err := dg.ExecuteBatch(context.Background(), target, queries, false, 0, nil) return err }) - testDiscoveryGatewayTransact(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayTransact(t, func(dg *discoveryGateway, target *querypb.Target) error { queries := []*querypb.BoundQuery{{Sql: "query", BindVariables: nil}} _, err := dg.ExecuteBatch(context.Background(), target, queries, false, 1, nil) return err @@ -60,7 +61,7 @@ func TestDiscoveryGatewayExecuteBatch(t *testing.T) { } func TestDiscoveryGatewayExecuteStream(t *testing.T) { - testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error { err := dg.StreamExecute(context.Background(), target, "query", nil, 0, nil, func(qr *sqltypes.Result) error { return nil }) @@ -69,33 +70,33 @@ func TestDiscoveryGatewayExecuteStream(t *testing.T) { } func TestDiscoveryGatewayBegin(t *testing.T) { - testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error { _, err := dg.Begin(context.Background(), target, nil) return err }) } func TestDiscoveryGatewayCommit(t *testing.T) { - testDiscoveryGatewayTransact(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayTransact(t, func(dg *discoveryGateway, target *querypb.Target) error { return dg.Commit(context.Background(), target, 1) }) } func TestDiscoveryGatewayRollback(t *testing.T) { - testDiscoveryGatewayTransact(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayTransact(t, func(dg *discoveryGateway, target *querypb.Target) error { return dg.Rollback(context.Background(), target, 1) }) } func TestDiscoveryGatewayBeginExecute(t *testing.T) { - testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error { _, _, err := dg.BeginExecute(context.Background(), target, "query", nil, nil) return err }) } func TestDiscoveryGatewayBeginExecuteBatch(t *testing.T) { - testDiscoveryGatewayGeneric(t, func(dg *tabletGateway, target *querypb.Target) error { + testDiscoveryGatewayGeneric(t, func(dg *discoveryGateway, target *querypb.Target) error { queries := []*querypb.BoundQuery{{Sql: "query", BindVariables: nil}} _, _, err := dg.BeginExecuteBatch(context.Background(), target, queries, false, nil) return err @@ -106,7 +107,7 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) { keyspace := "ks" shard := "0" hc := discovery.NewFakeHealthCheck() - dg := NewTabletGateway(context.Background(), hc, nil, "local", 2) + dg := NewDiscoveryGateway(context.Background(), hc, nil, "local", 2) // replica should only use local ones hc.Reset() @@ -214,7 +215,7 @@ func TestDiscoveryGatewayGetTabletsInRegion(t *testing.T) { Cells: []string{"local-west", "local-east"}, } - dg := NewTabletGateway(context.Background(), hc, srvTopo, "local-west", 2) + dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local-west", 2) ts.CreateCellsAlias(context.Background(), "local", cellsAlias) @@ -244,7 +245,7 @@ func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) { Cells: []string{"local-west", "local-east"}, } - dg := NewTabletGateway(context.Background(), hc, srvTopo, "local", 2) + dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2) ts.CreateCellsAlias(context.Background(), "local", cellsAlias) @@ -263,7 +264,7 @@ func TestDiscoveryGatewayGetTabletsWithRegion(t *testing.T) { } } -func testDiscoveryGatewayGeneric(t *testing.T, f func(dg *tabletGateway, target *querypb.Target) error) { +func testDiscoveryGatewayGeneric(t *testing.T, f func(dg *discoveryGateway, target *querypb.Target) error) { keyspace := "ks" shard := "0" tabletType := topodatapb.TabletType_REPLICA @@ -273,7 +274,7 @@ func testDiscoveryGatewayGeneric(t *testing.T, f func(dg *tabletGateway, target TabletType: tabletType, } hc := discovery.NewFakeHealthCheck() - dg := NewTabletGateway(context.Background(), hc, nil, "cell", 2) + dg := NewDiscoveryGateway(context.Background(), hc, nil, "cell", 2) // no tablet hc.Reset() @@ -346,7 +347,7 @@ func testDiscoveryGatewayGeneric(t *testing.T, f func(dg *tabletGateway, target } } -func testDiscoveryGatewayTransact(t *testing.T, f func(dg *tabletGateway, target *querypb.Target) error) { +func testDiscoveryGatewayTransact(t *testing.T, f func(dg *discoveryGateway, target *querypb.Target) error) { keyspace := "ks" shard := "0" tabletType := topodatapb.TabletType_REPLICA @@ -356,7 +357,7 @@ func testDiscoveryGatewayTransact(t *testing.T, f func(dg *tabletGateway, target TabletType: tabletType, } hc := discovery.NewFakeHealthCheck() - dg := NewTabletGateway(context.Background(), hc, nil, "cell", 2) + dg := NewDiscoveryGateway(context.Background(), hc, nil, "cell", 2) // retry error - no retry hc.Reset() diff --git a/go/vt/vtgate/gateway.go b/go/vt/vtgate/gateway.go new file mode 100644 index 00000000000..58cb77f27a6 --- /dev/null +++ b/go/vt/vtgate/gateway.go @@ -0,0 +1,119 @@ +/* +Copyright 2019 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vtgate + +import ( + "flag" + "time" + + "golang.org/x/net/context" + "vitess.io/vitess/go/flagutil" + "vitess.io/vitess/go/vt/log" + + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/srvtopo" + "vitess.io/vitess/go/vt/vttablet/queryservice" + + topodatapb "vitess.io/vitess/go/vt/proto/topodata" +) + +// This file contains the Gateway interface definition, and the +// implementations registry. +// A Gateway can take +// a query targeted to a keyspace/shard/tablet_type and send it off. + +var ( + implementation = flag.String("gateway_implementation", "discoverygateway", "The implementation of gateway") + initialTabletTimeout = flag.Duration("gateway_initial_tablet_timeout", 30*time.Second, "At startup, the gateway will wait up to that duration to get one tablet per keyspace/shard/tablettype") + + // KeyspacesToWatch - if provided this specifies which keyspaces should be + // visible to a vtgate. By default the vtgate will allow access to any + // keyspace. + KeyspacesToWatch flagutil.StringListValue +) + +func init() { + flag.Var(&KeyspacesToWatch, "keyspaces_to_watch", "Specifies which keyspaces this vtgate should have access to while routing queries or accessing the vschema") +} + +// A Gateway is the query processing module for each shard, +// which is used by ScatterConn. +type Gateway interface { + // the query service that this Gateway wraps around + queryservice.QueryService + + // WaitForTablets asks the gateway to wait for the provided + // tablets types to be available. It the context is canceled + // before the end, it should return ctx.Err(). + // The error returned will have specific effects: + // - nil: keep going with startup. + // - context.DeadlineExceeded: log a warning that we didn't get + // all tablets, and keep going with startup. + // - any other error: log.Fatalf out. + WaitForTablets(ctx context.Context, tabletTypesToWait []topodatapb.TabletType) error + + // RegisterStats registers exported stats for the gateway + RegisterStats() + + // CacheStatus returns a list of TabletCacheStatus per shard / tablet type. + CacheStatus() TabletCacheStatusList +} + +// Creator is the factory method which can create the actual gateway object. +type Creator func(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, cell string, retryCount int) Gateway + +var creators = make(map[string]Creator) + +// RegisterGatewayCreator registers a Creator with given name. +func RegisterGatewayCreator(name string, gc Creator) { + if _, ok := creators[name]; ok { + log.Fatalf("Gateway %s already exists", name) + } + creators[name] = gc +} + +// GetGatewayCreator returns the Creator specified by the gateway_implementation flag. +func GetGatewayCreator() Creator { + gc, ok := creators[*implementation] + if !ok { + log.Exitf("No gateway registered as %s", *implementation) + } + return gc +} + +// WaitForTablets is a helper method to wait for the provided tablets, +// up until the *initialTabletTimeout. It will log what it is doing. +// Note it has the same name as the Gateway's interface method, as it +// just calls it. +func WaitForTablets(gw Gateway, tabletTypesToWait []topodatapb.TabletType) error { + log.Infof("Gateway waiting for serving tablets of types %v ...", tabletTypesToWait) + ctx, cancel := context.WithTimeout(context.Background(), *initialTabletTimeout) + defer cancel() + + err := gw.WaitForTablets(ctx, tabletTypesToWait) + switch err { + case nil: + // Log so we know everything is fine. + log.Infof("Waiting for tablets completed") + case context.DeadlineExceeded: + // In this scenario, we were able to reach the + // topology service, but some tablets may not be + // ready. We just warn and keep going. + log.Warningf("Timeout waiting for all keyspaces / shards to have healthy tablets of types %v, may be in degraded mode", tabletTypesToWait) + err = nil + default: + // Nothing to do here, the caller will log.Fatalf. + } + return err +} diff --git a/go/vt/vtgate/gateway_test_suite.go b/go/vt/vtgate/gateway_test_suite.go index db5c69bb62b..6241399a21d 100644 --- a/go/vt/vtgate/gateway_test_suite.go +++ b/go/vt/vtgate/gateway_test_suite.go @@ -14,7 +14,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package gatewaytest contains a test suite to run against a Gateway object. +package vtgate + +// This file contains a test suite to run against a Gateway object. // We re-use the tabletconn test suite, as it tests all queries and parameters // go through. There are two exceptions: // - the health check: we just make that one work, so the gateway knows the @@ -22,7 +24,6 @@ limitations under the License. // - the error type returned: it's not a TabletError any more, but a ShardError. // We still check the error code is correct though which is really all we care // about. -package vtgate import ( "testing" @@ -79,7 +80,7 @@ func CreateFakeServers(t *testing.T) (*tabletconntest.FakeQueryService, *topo.Se // gatewayAdapter implements the TabletConn interface, but sends the // queries to the Gateway. type gatewayAdapter struct { - *tabletGateway + Gateway } // Close should be overridden to make sure we don't close the underlying Gateway. @@ -91,12 +92,12 @@ func (ga *gatewayAdapter) Close(ctx context.Context) error { // gateway needs to be configured with one established connection for // tabletconntest.TestTarget.{Keyspace, Shard, TabletType} to the // provided tabletconntest.FakeQueryService. -func TestSuite(t *testing.T, name string, g *tabletGateway, f *tabletconntest.FakeQueryService) { +func TestSuite(t *testing.T, name string, g Gateway, f *tabletconntest.FakeQueryService) { protocolName := "gateway-test-" + name tabletconn.RegisterDialer(protocolName, func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast) (queryservice.QueryService, error) { - return &gatewayAdapter{tabletGateway: g}, nil + return &gatewayAdapter{Gateway: g}, nil }) tabletconntest.TestSuite(t, protocolName, &topodatapb.Tablet{ diff --git a/go/vt/vtgate/grpc_discovery_test.go b/go/vt/vtgate/grpc_discovery_test.go index b6aa62633d4..03d22a810ad 100644 --- a/go/vt/vtgate/grpc_discovery_test.go +++ b/go/vt/vtgate/grpc_discovery_test.go @@ -18,11 +18,12 @@ package vtgate import ( "flag" - "golang.org/x/net/context" "net" "testing" "time" + "golang.org/x/net/context" + "google.golang.org/grpc" "vitess.io/vitess/go/vt/discovery" @@ -63,7 +64,7 @@ func TestGRPCDiscovery(t *testing.T) { // Wait for the right tablets to be present. hc := discovery.NewHealthCheck(10*time.Second, 2*time.Minute) rs := srvtopo.NewResilientServer(ts, "TestGRPCDiscovery") - dg := NewTabletGateway(context.Background(), hc, rs, cell, 2) + dg := NewDiscoveryGateway(context.Background(), hc, rs, cell, 2) hc.AddTablet(&topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: cell, diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index bce0a26425a..d3657aab4c7 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -47,7 +47,7 @@ type ScatterConn struct { timings *stats.MultiTimings tabletCallErrorCount *stats.CountersWithMultiLabels txConn *TxConn - gateway *tabletGateway + gateway Gateway healthCheck discovery.HealthCheck } @@ -69,7 +69,7 @@ type shardActionFunc func(rs *srvtopo.ResolvedShard, i int) error type shardActionTransactionFunc func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64) (int64, error) // NewScatterConn creates a new ScatterConn. -func NewScatterConn(statsName string, txConn *TxConn, gw *tabletGateway, hc discovery.HealthCheck) *ScatterConn { +func NewScatterConn(statsName string, txConn *TxConn, gw Gateway, hc discovery.HealthCheck) *ScatterConn { tabletCallErrorCountStatsName := "" if statsName != "" { tabletCallErrorCountStatsName = statsName + "ErrorCount" diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 6267272e2ef..6facc087716 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -652,7 +652,7 @@ func newTestScatterConn(hc discovery.HealthCheck, serv srvtopo.Server, cell stri // The topo.Server is used to start watching the cells described // in '-cells_to_watch' command line parameter, which is // empty by default. So it's unused in this test, set to nil. - gw := NewTabletGateway(context.Background(), hc, serv, cell, 3) + gw := GetGatewayCreator()(context.Background(), hc, serv, cell, 3) tc := NewTxConn(gw, vtgatepb.TransactionMode_TWOPC) return NewScatterConn("", tc, gw, hc) } diff --git a/go/vt/vtgate/status_test.go b/go/vt/vtgate/status_test.go index 8472faae5d2..3c420e68bae 100644 --- a/go/vt/vtgate/status_test.go +++ b/go/vt/vtgate/status_test.go @@ -22,6 +22,7 @@ import ( "reflect" "testing" "time" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) diff --git a/go/vt/vtgate/tx_conn.go b/go/vt/vtgate/tx_conn.go index 1cf7b4c0c93..ae8926f16b0 100644 --- a/go/vt/vtgate/tx_conn.go +++ b/go/vt/vtgate/tx_conn.go @@ -33,12 +33,12 @@ import ( // TxConn is used for executing transactional requests. type TxConn struct { - gateway *tabletGateway + gateway Gateway mode vtgatepb.TransactionMode } // NewTxConn builds a new TxConn. -func NewTxConn(gw *tabletGateway, txMode vtgatepb.TransactionMode) *TxConn { +func NewTxConn(gw Gateway, txMode vtgatepb.TransactionMode) *TxConn { return &TxConn{ gateway: gw, mode: txMode, diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 44eead7beac..94aeea2a1ed 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -873,7 +873,7 @@ func TestResolveVStreamParams(t *testing.T) { } func newTestVStreamManager(hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vstreamManager { - gw := NewTabletGateway(context.Background(), hc, serv, cell, 3) + gw := NewDiscoveryGateway(context.Background(), hc, serv, cell, 3) srvResolver := srvtopo.NewResolver(serv, gw, cell) return newVStreamManager(srvResolver, serv, cell) } diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 662dd64d6ac..7f0626673c5 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -101,7 +101,7 @@ type VTGate struct { resolver *Resolver vsm *vstreamManager txConn *TxConn - gw *tabletGateway + gw Gateway // stats objects. // TODO(sougou): This needs to be cleaned up. There @@ -133,7 +133,7 @@ func Init(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, ce // Build objects from low to high level. // Start with the gateway. If we can't reach the topology service, // we can't go on much further, so we log.Fatal out. - gw := NewTabletGateway(ctx, hc, serv, cell, retryCount) + gw := GetGatewayCreator()(ctx, hc, serv, cell, retryCount) gw.RegisterStats() if err := WaitForTablets(gw, tabletTypesToWait); err != nil { log.Fatalf("gateway.WaitForTablets failed: %v", err) @@ -227,7 +227,7 @@ func (vtg *VTGate) IsHealthy() error { } // Gateway returns the current gateway implementation. Mostly used for tests. -func (vtg *VTGate) Gateway() *tabletGateway { +func (vtg *VTGate) Gateway() Gateway { return vtg.gw } diff --git a/go/vt/vttablet/queryservice/wrapped.go b/go/vt/vttablet/queryservice/wrapped.go index 9443951d6fe..185046dd9df 100644 --- a/go/vt/vttablet/queryservice/wrapped.go +++ b/go/vt/vttablet/queryservice/wrapped.go @@ -39,7 +39,7 @@ type WrapperFunc func(ctx context.Context, target *querypb.Target, conn QuerySer // Wrap returns a wrapped version of the original QueryService implementation. // This lets you avoid repeating boiler-plate code by consolidating it in the // wrapper function. -// A good example of this is go/vt/vtgate/gateway/tabletgateway.go. +// A good example of this is go/vt/vtgate/gateway/discoverygateway.go. // For every method invocation, the wrapper function is called, which can // in turn call the provided inner function that will use the input parameters // to call the implementation. In order to load balance across multiple From 565d0928610952aa36c11e4c864ccf65347e1dcf Mon Sep 17 00:00:00 2001 From: deepthi Date: Tue, 31 Mar 2020 09:20:13 -0700 Subject: [PATCH 2/2] address review comments Signed-off-by: deepthi --- go/vt/vtexplain/vtexplain_vtgate.go | 2 +- go/vt/vtgate/gateway.go | 4 ++-- go/vt/vtgate/scatter_conn_test.go | 2 +- go/vt/vtgate/vtgate.go | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 97098b0db34..95259873f67 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -72,7 +72,7 @@ func initVtgateExecutor(vSchemaStr string, opts *Options) error { func newFakeResolver(opts *Options, hc discovery.HealthCheck, serv srvtopo.Server, cell string) *vtgate.Resolver { ctx := context.Background() - gw := vtgate.GetGatewayCreator()(ctx, hc, serv, cell, 3) + gw := vtgate.GatewayCreator()(ctx, hc, serv, cell, 3) gw.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA}) txMode := vtgatepb.TransactionMode_MULTI diff --git a/go/vt/vtgate/gateway.go b/go/vt/vtgate/gateway.go index 58cb77f27a6..8bf30833372 100644 --- a/go/vt/vtgate/gateway.go +++ b/go/vt/vtgate/gateway.go @@ -83,8 +83,8 @@ func RegisterGatewayCreator(name string, gc Creator) { creators[name] = gc } -// GetGatewayCreator returns the Creator specified by the gateway_implementation flag. -func GetGatewayCreator() Creator { +// GatewayCreator returns the Creator specified by the gateway_implementation flag. +func GatewayCreator() Creator { gc, ok := creators[*implementation] if !ok { log.Exitf("No gateway registered as %s", *implementation) diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 6facc087716..ede85cc5cfc 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -652,7 +652,7 @@ func newTestScatterConn(hc discovery.HealthCheck, serv srvtopo.Server, cell stri // The topo.Server is used to start watching the cells described // in '-cells_to_watch' command line parameter, which is // empty by default. So it's unused in this test, set to nil. - gw := GetGatewayCreator()(context.Background(), hc, serv, cell, 3) + gw := GatewayCreator()(context.Background(), hc, serv, cell, 3) tc := NewTxConn(gw, vtgatepb.TransactionMode_TWOPC) return NewScatterConn("", tc, gw, hc) } diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index 7f0626673c5..0a92c6dbf95 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -133,7 +133,7 @@ func Init(ctx context.Context, hc discovery.HealthCheck, serv srvtopo.Server, ce // Build objects from low to high level. // Start with the gateway. If we can't reach the topology service, // we can't go on much further, so we log.Fatal out. - gw := GetGatewayCreator()(ctx, hc, serv, cell, retryCount) + gw := GatewayCreator()(ctx, hc, serv, cell, retryCount) gw.RegisterStats() if err := WaitForTablets(gw, tabletTypesToWait); err != nil { log.Fatalf("gateway.WaitForTablets failed: %v", err)