diff --git a/go/vt/vttablet/heartbeat/reader.go b/go/vt/vttablet/heartbeat/reader.go index 90de44261b1..65c25805253 100644 --- a/go/vt/vttablet/heartbeat/reader.go +++ b/go/vt/vttablet/heartbeat/reader.go @@ -28,7 +28,6 @@ import ( "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/timer" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/sqlparser" @@ -49,7 +48,7 @@ const ( // table against the current time at read time. This value is reported in metrics and // also to the healthchecks. type Reader struct { - dbconfigs *dbconfigs.DBConfigs + env tabletenv.Env enabled bool interval time.Duration @@ -69,33 +68,30 @@ type Reader struct { } // NewReader returns a new heartbeat reader. -func NewReader(checker connpool.MySQLChecker, config tabletenv.TabletConfig) *Reader { +func NewReader(env tabletenv.Env) *Reader { + config := env.Config() if !config.HeartbeatEnable { return &Reader{} } return &Reader{ + env: env, enabled: true, now: time.Now, interval: config.HeartbeatInterval, ticks: timer.NewTimer(config.HeartbeatInterval), errorLog: logutil.NewThrottledLogger("HeartbeatReporter", 60*time.Second), - pool: connpool.New(config.PoolNamePrefix+"HeartbeatReadPool", 1, 0, time.Duration(config.IdleTimeout*1e9), checker), + pool: connpool.New(env, config.PoolNamePrefix+"HeartbeatReadPool", 1, 0, time.Duration(config.IdleTimeout*1e9)), } } -// InitDBConfig must be called before Init. -func (r *Reader) InitDBConfig(dbcfgs *dbconfigs.DBConfigs) { - r.dbconfigs = dbcfgs -} - // Init does last minute initialization of db settings, such as dbName // and keyspaceShard func (r *Reader) Init(target querypb.Target) { if !r.enabled { return } - r.dbName = sqlescape.EscapeID(r.dbconfigs.SidecarDBName.Get()) + r.dbName = sqlescape.EscapeID(r.env.DBConfigs().SidecarDBName.Get()) r.keyspaceShard = fmt.Sprintf("%s:%s", target.Keyspace, target.Shard) } @@ -112,7 +108,7 @@ func (r *Reader) Open() { } log.Info("Beginning heartbeat reads") - r.pool.Open(r.dbconfigs.AppWithDB(), r.dbconfigs.DbaWithDB(), r.dbconfigs.AppDebugWithDB()) + r.pool.Open(r.env.DBConfigs().AppWithDB(), r.env.DBConfigs().DbaWithDB(), r.env.DBConfigs().AppDebugWithDB()) r.ticks.Start(func() { r.readHeartbeat() }) r.isOpen = true } diff --git a/go/vt/vttablet/heartbeat/reader_test.go b/go/vt/vttablet/heartbeat/reader_test.go index 152a28a9b7c..d956e93aa0b 100644 --- a/go/vt/vttablet/heartbeat/reader_test.go +++ b/go/vt/vttablet/heartbeat/reader_test.go @@ -110,7 +110,7 @@ func newReader(db *fakesqldb.DB, nowFunc func() time.Time) *Reader { cp := *params dbc := dbconfigs.NewTestDBConfigs(cp, cp, "") - tr := NewReader(&fakeMysqlChecker{}, config) + tr := NewReader(tabletenv.NewTestEnv(&config, nil)) tr.dbName = sqlescape.EscapeID(dbc.SidecarDBName.Get()) tr.keyspaceShard = "test:0" tr.now = nowFunc diff --git a/go/vt/vttablet/heartbeat/writer.go b/go/vt/vttablet/heartbeat/writer.go index a9dbae08799..34cbcee58c0 100644 --- a/go/vt/vttablet/heartbeat/writer.go +++ b/go/vt/vttablet/heartbeat/writer.go @@ -56,7 +56,7 @@ const ( // Writer runs on master tablets and writes heartbeats to the _vt.heartbeat // table at a regular interval, defined by heartbeat_interval. type Writer struct { - dbconfigs *dbconfigs.DBConfigs + env tabletenv.Env enabled bool interval time.Duration @@ -73,26 +73,23 @@ type Writer struct { } // NewWriter creates a new Writer. -func NewWriter(checker connpool.MySQLChecker, alias topodatapb.TabletAlias, config tabletenv.TabletConfig) *Writer { +func NewWriter(env tabletenv.Env, alias topodatapb.TabletAlias) *Writer { + config := env.Config() if !config.HeartbeatEnable { return &Writer{} } return &Writer{ + env: env, enabled: true, tabletAlias: alias, now: time.Now, interval: config.HeartbeatInterval, ticks: timer.NewTimer(config.HeartbeatInterval), errorLog: logutil.NewThrottledLogger("HeartbeatWriter", 60*time.Second), - pool: connpool.New(config.PoolNamePrefix+"HeartbeatWritePool", 1, 0, time.Duration(config.IdleTimeout*1e9), checker), + pool: connpool.New(env, config.PoolNamePrefix+"HeartbeatWritePool", 1, 0, time.Duration(config.IdleTimeout*1e9)), } } -// InitDBConfig must be called before Init. -func (w *Writer) InitDBConfig(dbcfgs *dbconfigs.DBConfigs) { - w.dbconfigs = dbcfgs -} - // Init runs at tablet startup and last minute initialization of db settings, and // creates the necessary tables for heartbeat. func (w *Writer) Init(target querypb.Target) error { @@ -102,10 +99,10 @@ func (w *Writer) Init(target querypb.Target) error { w.mu.Lock() defer w.mu.Unlock() log.Info("Initializing heartbeat table.") - w.dbName = sqlescape.EscapeID(w.dbconfigs.SidecarDBName.Get()) + w.dbName = sqlescape.EscapeID(w.env.DBConfigs().SidecarDBName.Get()) w.keyspaceShard = fmt.Sprintf("%s:%s", target.Keyspace, target.Shard) - err := w.initializeTables(w.dbconfigs.DbaWithDB()) + err := w.initializeTables(w.env.DBConfigs().DbaWithDB()) if err != nil { w.recordError(err) return err @@ -128,7 +125,7 @@ func (w *Writer) Open() { return } log.Info("Beginning heartbeat writes") - w.pool.Open(w.dbconfigs.AppWithDB(), w.dbconfigs.DbaWithDB(), w.dbconfigs.AppDebugWithDB()) + w.pool.Open(w.env.DBConfigs().AppWithDB(), w.env.DBConfigs().DbaWithDB(), w.env.DBConfigs().AppDebugWithDB()) w.ticks.Start(func() { w.writeHeartbeat() }) w.isOpen = true } diff --git a/go/vt/vttablet/heartbeat/writer_test.go b/go/vt/vttablet/heartbeat/writer_test.go index 0b98e0ef2ed..30900fad3b1 100644 --- a/go/vt/vttablet/heartbeat/writer_test.go +++ b/go/vt/vttablet/heartbeat/writer_test.go @@ -115,9 +115,7 @@ func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *Writer { cp := *params dbc := dbconfigs.NewTestDBConfigs(cp, cp, "") - tw := NewWriter(&fakeMysqlChecker{}, - topodatapb.TabletAlias{Cell: "test", Uid: 1111}, - config) + tw := NewWriter(tabletenv.NewTestEnv(&config, nil), topodatapb.TabletAlias{Cell: "test", Uid: 1111}) tw.dbName = sqlescape.EscapeID(dbc.SidecarDBName.Get()) tw.keyspaceShard = "test:0" tw.now = nowFunc @@ -125,7 +123,3 @@ func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *Writer { return tw } - -type fakeMysqlChecker struct{} - -func (f fakeMysqlChecker) CheckMySQL() {} diff --git a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go index a87f6fe0dfa..31b5bf86547 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/framework_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/framework_test.go @@ -87,8 +87,7 @@ func TestMain(m *testing.M) { // engines cannot be initialized in testenv because it introduces // circular dependencies. - streamerEngine = vstreamer.NewEngine(env.SrvTopo, env.SchemaEngine) - streamerEngine.InitDBConfig(env.Dbcfgs.DbaWithDB()) + streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine) streamerEngine.Open(env.KeyspaceName, env.Cells[0]) defer streamerEngine.Close() diff --git a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go index 783f043b94a..53d585a5997 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go @@ -28,7 +28,6 @@ import ( "vitess.io/vitess/go/vt/grpcclient" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" @@ -158,7 +157,8 @@ func (vsClient *MySQLVStreamerClient) Open(ctx context.Context) (err error) { // Let's create all the required components by vstreamer - vsClient.sourceSe = schema.NewEngine(checker{}, tabletenv.DefaultQsConfig) + config := tabletenv.DefaultQsConfig + vsClient.sourceSe = schema.NewEngine(tabletenv.NewTestEnv(&config, nil)) vsClient.sourceSe.InitDBConfig(vsClient.sourceConnParams) err = vsClient.sourceSe.Open() if err != nil { @@ -210,9 +210,3 @@ func (vsClient *MySQLVStreamerClient) VStreamRows(ctx context.Context, query str func InitVStreamerClient(cfg *dbconfigs.DBConfigs) { dbcfgs = cfg } - -type checker struct{} - -var _ = connpool.MySQLChecker(checker{}) - -func (checker) CheckMySQL() {} diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index ef39b8e193b..2ffe8c308d9 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -67,7 +67,7 @@ func NewDBConn( appParams dbconfigs.Connector) (*DBConn, error) { c, err := dbconnpool.NewDBConnection(appParams, tabletenv.MySQLStats) if err != nil { - cp.checker.CheckMySQL() + cp.env.CheckMySQL() return nil, err } return &DBConn{ @@ -120,7 +120,7 @@ func (dbc *DBConn) Exec(ctx context.Context, query string, maxrows int, wantfiel } if reconnectErr := dbc.reconnect(); reconnectErr != nil { - dbc.pool.checker.CheckMySQL() + dbc.pool.env.CheckMySQL() // Return the error of the reconnect and not the original connection error. return nil, reconnectErr } @@ -201,7 +201,7 @@ func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqlt default: } if reconnectErr := dbc.reconnect(); reconnectErr != nil { - dbc.pool.checker.CheckMySQL() + dbc.pool.env.CheckMySQL() // Return the error of the reconnect and not the original connection error. return reconnectErr } diff --git a/go/vt/vttablet/tabletserver/connpool/pool.go b/go/vt/vttablet/tabletserver/connpool/pool.go index aa48333ab92..9fe1aae70c7 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool.go +++ b/go/vt/vttablet/tabletserver/connpool/pool.go @@ -45,12 +45,6 @@ var ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "internal error: une // through non-test code. var usedNames = make(map[string]bool) -// MySQLChecker defines the CheckMySQL interface that lower -// level objects can use to call back into TabletServer. -type MySQLChecker interface { - CheckMySQL() -} - // Pool implements a custom connection pool for tabletserver. // It's similar to dbconnpool.ConnPool, but the connections it creates // come with built-in ability to kill in-flight queries. These connections @@ -58,6 +52,7 @@ type MySQLChecker interface { // Other than the connection type, ConnPool maintains an additional // pool of dba connections that are used to kill connections. type Pool struct { + env tabletenv.Env name string mu sync.Mutex connections *pools.ResourcePool @@ -65,25 +60,19 @@ type Pool struct { prefillParallelism int idleTimeout time.Duration dbaPool *dbconnpool.ConnectionPool - checker MySQLChecker appDebugParams dbconfigs.Connector } // New creates a new Pool. The name is used // to publish stats only. -func New( - name string, - capacity int, - prefillParallelism int, - idleTimeout time.Duration, - checker MySQLChecker) *Pool { +func New(env tabletenv.Env, name string, capacity int, prefillParallelism int, idleTimeout time.Duration) *Pool { cp := &Pool{ + env: env, name: name, capacity: capacity, prefillParallelism: prefillParallelism, idleTimeout: idleTimeout, dbaPool: dbconnpool.NewConnectionPool("", 1, idleTimeout, 0), - checker: checker, } if name == "" || usedNames[name] { return cp diff --git a/go/vt/vttablet/tabletserver/connpool/pool_test.go b/go/vt/vttablet/tabletserver/connpool/pool_test.go index 4920450402c..332ccf35841 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool_test.go +++ b/go/vt/vttablet/tabletserver/connpool/pool_test.go @@ -24,6 +24,7 @@ import ( "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "golang.org/x/net/context" ) @@ -220,19 +221,12 @@ func TestConnPoolStateWhilePoolIsOpen(t *testing.T) { } } -type dummyChecker struct { -} - -func (dummyChecker) CheckMySQL() {} - -var checker = dummyChecker{} - func newPool() *Pool { return New( + tabletenv.NewTestEnv(nil, nil), fmt.Sprintf("TestPool%d", rand.Int63()), 100, 0, 10*time.Second, - checker, ) } diff --git a/go/vt/vttablet/tabletserver/messager/engine.go b/go/vt/vttablet/tabletserver/messager/engine.go index b603e75dcc0..150d0378179 100644 --- a/go/vt/vttablet/tabletserver/messager/engine.go +++ b/go/vt/vttablet/tabletserver/messager/engine.go @@ -36,6 +36,7 @@ import ( // TabletService defines the functions of TabletServer // that the messager needs for callback. type TabletService interface { + tabletenv.Env PostponeMessages(ctx context.Context, target *querypb.Target, name string, ids []string) (count int64, err error) PurgeMessages(ctx context.Context, target *querypb.Target, name string, timeCutoff int64) (count int64, err error) } @@ -60,12 +61,12 @@ type Engine struct { } // NewEngine creates a new Engine. -func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer, config tabletenv.TabletConfig) *Engine { +func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine { return &Engine{ tsv: tsv, se: se, vs: vs, - postponeSema: sync2.NewSemaphore(config.MessagePostponeCap, 0), + postponeSema: sync2.NewSemaphore(tsv.Config().MessagePostponeCap, 0), managers: make(map[string]*messageManager), } } diff --git a/go/vt/vttablet/tabletserver/messager/engine_test.go b/go/vt/vttablet/tabletserver/messager/engine_test.go index 953d405f38f..546db4aea59 100644 --- a/go/vt/vttablet/tabletserver/messager/engine_test.go +++ b/go/vt/vttablet/tabletserver/messager/engine_test.go @@ -175,9 +175,11 @@ func newTestEngine(db *fakesqldb.DB) *Engine { randID := rand.Int63() config := tabletenv.DefaultQsConfig config.PoolNamePrefix = fmt.Sprintf("Pool-%d-", randID) - tsv := newFakeTabletServer() - se := schema.NewEngine(tsv, config) - te := NewEngine(tsv, se, newFakeVStreamer(), config) + tsv := &fakeTabletServer{ + Env: tabletenv.NewTestEnv(&config, nil), + } + se := schema.NewEngine(tsv) + te := NewEngine(tsv, se, newFakeVStreamer()) te.Open() return te } diff --git a/go/vt/vttablet/tabletserver/messager/message_manager_test.go b/go/vt/vttablet/tabletserver/messager/message_manager_test.go index c7b41a3e632..2d766c30053 100644 --- a/go/vt/vttablet/tabletserver/messager/message_manager_test.go +++ b/go/vt/vttablet/tabletserver/messager/message_manager_test.go @@ -33,6 +33,7 @@ import ( "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" @@ -810,6 +811,7 @@ func TestMMGenerateWithBackoff(t *testing.T) { } type fakeTabletServer struct { + tabletenv.Env postponeCount sync2.AtomicInt64 purgeCount sync2.AtomicInt64 @@ -817,7 +819,12 @@ type fakeTabletServer struct { ch chan string } -func newFakeTabletServer() *fakeTabletServer { return &fakeTabletServer{} } +func newFakeTabletServer() *fakeTabletServer { + config := tabletenv.DefaultQsConfig + return &fakeTabletServer{ + Env: tabletenv.NewTestEnv(&config, nil), + } +} func (fts *fakeTabletServer) CheckMySQL() {} diff --git a/go/vt/vttablet/tabletserver/query_engine.go b/go/vt/vttablet/tabletserver/query_engine.go index 303ecf6657c..d5d1950c836 100644 --- a/go/vt/vttablet/tabletserver/query_engine.go +++ b/go/vt/vttablet/tabletserver/query_engine.go @@ -33,7 +33,6 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/trace" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" @@ -122,8 +121,8 @@ var ( // Close: There should be no more pending queries when this // function is called. type QueryEngine struct { - se *schema.Engine - dbconfigs *dbconfigs.DBConfigs + env tabletenv.Env + se *schema.Engine // mu protects the following fields. mu sync.RWMutex @@ -178,8 +177,10 @@ var ( // NewQueryEngine creates a new QueryEngine. // This is a singleton class. // You must call this only once. -func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tabletenv.TabletConfig) *QueryEngine { +func NewQueryEngine(env tabletenv.Env, se *schema.Engine) *QueryEngine { + config := env.Config() qe := &QueryEngine{ + env: env, se: se, tables: make(map[string]*schema.Table), plans: cache.NewLRUCache(int64(config.QueryPlanCacheSize)), @@ -187,22 +188,10 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab queryPoolWaiterCap: sync2.NewAtomicInt64(int64(config.QueryPoolWaiterCap)), } - qe.conns = connpool.New( - config.PoolNamePrefix+"ConnPool", - config.PoolSize, - config.PoolPrefillParallelism, - time.Duration(config.IdleTimeout*1e9), - checker, - ) + qe.conns = connpool.New(env, config.PoolNamePrefix+"ConnPool", config.PoolSize, config.PoolPrefillParallelism, time.Duration(config.IdleTimeout*1e9)) qe.connTimeout.Set(time.Duration(config.QueryPoolTimeout * 1e9)) - qe.streamConns = connpool.New( - config.PoolNamePrefix+"StreamConnPool", - config.StreamPoolSize, - config.StreamPoolPrefillParallelism, - time.Duration(config.IdleTimeout*1e9), - checker, - ) + qe.streamConns = connpool.New(env, config.PoolNamePrefix+"StreamConnPool", config.StreamPoolSize, config.StreamPoolPrefillParallelism, time.Duration(config.IdleTimeout*1e9)) qe.enableConsolidator = config.EnableConsolidator qe.enableConsolidatorReplicas = config.EnableConsolidatorReplicas qe.enableQueryPlanFieldCaching = config.EnableQueryPlanFieldCaching @@ -277,14 +266,9 @@ func NewQueryEngine(checker connpool.MySQLChecker, se *schema.Engine, config tab return qe } -// InitDBConfig must be called before Open. -func (qe *QueryEngine) InitDBConfig(dbcfgs *dbconfigs.DBConfigs) { - qe.dbconfigs = dbcfgs -} - // Open must be called before sending requests to QueryEngine. func (qe *QueryEngine) Open() error { - qe.conns.Open(qe.dbconfigs.AppWithDB(), qe.dbconfigs.DbaWithDB(), qe.dbconfigs.AppDebugWithDB()) + qe.conns.Open(qe.env.DBConfigs().AppWithDB(), qe.env.DBConfigs().DbaWithDB(), qe.env.DBConfigs().AppDebugWithDB()) conn, err := qe.conns.Get(tabletenv.LocalContext()) if err != nil { @@ -301,7 +285,7 @@ func (qe *QueryEngine) Open() error { return err } - qe.streamConns.Open(qe.dbconfigs.AppWithDB(), qe.dbconfigs.DbaWithDB(), qe.dbconfigs.AppDebugWithDB()) + qe.streamConns.Open(qe.env.DBConfigs().AppWithDB(), qe.env.DBConfigs().DbaWithDB(), qe.env.DBConfigs().AppDebugWithDB()) qe.se.RegisterNotifier("qe", qe.schemaChanged) return nil } @@ -431,7 +415,7 @@ func (qe *QueryEngine) ClearQueryPlanCache() { // IsMySQLReachable returns true if we can connect to MySQL. func (qe *QueryEngine) IsMySQLReachable() bool { - conn, err := dbconnpool.NewDBConnection(qe.dbconfigs.DbaWithDB(), tabletenv.MySQLStats) + conn, err := dbconnpool.NewDBConnection(qe.env.DBConfigs().DbaWithDB(), tabletenv.MySQLStats) if err != nil { if mysql.IsConnErr(err) { return false diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index 2aef69cbed6..f6ec50e79b8 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -51,9 +51,9 @@ func TestStrictMode(t *testing.T) { // Test default behavior. config := tabletenv.DefaultQsConfig - // config.EnforceStrictTransTable is true by default. - qe := NewQueryEngine(DummyChecker, schema.NewEngine(DummyChecker, config), config) - qe.InitDBConfig(dbcfgs) + env := tabletenv.NewTestEnv(&config, dbcfgs) + se := schema.NewEngine(env) + qe := NewQueryEngine(env, se) qe.se.InitDBConfig(dbcfgs.DbaWithDB()) qe.se.Open() if err := qe.Open(); err != nil { @@ -69,8 +69,7 @@ func TestStrictMode(t *testing.T) { Rows: [][]sqltypes.Value{{sqltypes.NewVarBinary("")}}, }, ) - qe = NewQueryEngine(DummyChecker, schema.NewEngine(DummyChecker, config), config) - qe.InitDBConfig(dbcfgs) + qe = NewQueryEngine(env, se) err := qe.Open() wantErr := "require sql_mode to be STRICT_TRANS_TABLES or STRICT_ALL_TABLES: got ''" if err == nil || err.Error() != wantErr { @@ -80,8 +79,7 @@ func TestStrictMode(t *testing.T) { // Test that we succeed if the enforcement flag is off. config.EnforceStrictTransTables = false - qe = NewQueryEngine(DummyChecker, schema.NewEngine(DummyChecker, config), config) - qe.InitDBConfig(dbcfgs) + qe = NewQueryEngine(env, se) if err := qe.Open(); err != nil { t.Fatal(err) } @@ -296,10 +294,10 @@ func newTestQueryEngine(queryPlanCacheSize int, idleTimeout time.Duration, stric config := tabletenv.DefaultQsConfig config.QueryPlanCacheSize = queryPlanCacheSize config.IdleTimeout = float64(idleTimeout) / 1e9 - se := schema.NewEngine(DummyChecker, config) - qe := NewQueryEngine(DummyChecker, se, config) + env := tabletenv.NewTestEnv(&config, dbcfgs) + se := schema.NewEngine(env) + qe := NewQueryEngine(env, se) se.InitDBConfig(dbcfgs.DbaWithDB()) - qe.InitDBConfig(dbcfgs) return qe } diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 01bff7212d7..28dba73ae90 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -48,7 +48,8 @@ type notifier func(full map[string]*Table, created, altered, dropped []string) // Engine stores the schema info and performs operations that // keep itself up-to-date. type Engine struct { - cp dbconfigs.Connector + env tabletenv.Env + cp dbconfigs.Connector // mu protects the following fields. mu sync.Mutex @@ -67,13 +68,14 @@ type Engine struct { var schemaOnce sync.Once // NewEngine creates a new Engine. -func NewEngine(checker connpool.MySQLChecker, config tabletenv.TabletConfig) *Engine { - reloadTime := time.Duration(config.SchemaReloadTime * 1e9) - idleTimeout := time.Duration(config.IdleTimeout * 1e9) +func NewEngine(env tabletenv.Env) *Engine { + reloadTime := time.Duration(env.Config().SchemaReloadTime * 1e9) + idleTimeout := time.Duration(env.Config().IdleTimeout * 1e9) se := &Engine{ + env: env, // We need only one connection because the reloader is // the only one that needs this. - conns: connpool.New("", 1, 0, idleTimeout, checker), + conns: connpool.New(env, "", 1, 0, idleTimeout), ticks: timer.NewTimer(reloadTime), reloadTime: reloadTime, } diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 62269d76891..3922b0469b0 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -298,19 +298,12 @@ func TestStatsURL(t *testing.T) { se.ServeHTTP(response, request) } -type dummyChecker struct { -} - -func (dummyChecker) CheckMySQL() {} - -var DummyChecker = dummyChecker{} - func newEngine(queryPlanCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, strict bool, db *fakesqldb.DB) *Engine { config := tabletenv.DefaultQsConfig config.QueryPlanCacheSize = queryPlanCacheSize config.SchemaReloadTime = float64(reloadTime) / 1e9 config.IdleTimeout = float64(idleTimeout) / 1e9 - se := NewEngine(DummyChecker, config) + se := NewEngine(tabletenv.NewTestEnv(&config, nil)) se.InitDBConfig(newDBConfigs(db).DbaWithDB()) return se } diff --git a/go/vt/vttablet/tabletserver/schema/load_table_test.go b/go/vt/vttablet/tabletserver/schema/load_table_test.go index 0624479e29c..7315b9eb1c8 100644 --- a/go/vt/vttablet/tabletserver/schema/load_table_test.go +++ b/go/vt/vttablet/tabletserver/schema/load_table_test.go @@ -23,12 +23,14 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "golang.org/x/net/context" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" ) @@ -133,6 +135,7 @@ func TestLoadTableMessage(t *testing.T) { // Test loading min/max backoff table, err = newTestLoadTable("USER_TABLE", "vitess_message,vt_ack_wait=30,vt_purge_after=120,vt_batch_size=1,vt_cache_size=10,vt_poller_interval=30,vt_min_backoff=10,vt_max_backoff=100", db) + require.NoError(t, err) want.MessageInfo.MinBackoff = 10 * time.Second want.MessageInfo.MaxBackoff = 100 * time.Second assert.Equal(t, want, table) @@ -159,7 +162,7 @@ func newTestLoadTable(tableType string, comment string, db *fakesqldb.DB) (*Tabl appParams := db.ConnParams() dbaParams := db.ConnParams() connPoolIdleTimeout := 10 * time.Second - connPool := connpool.New("", 2, 0, connPoolIdleTimeout, DummyChecker) + connPool := connpool.New(tabletenv.NewTestEnv(nil, nil), "", 2, 0, connPoolIdleTimeout) connPool.Open(appParams, dbaParams, appParams) conn, err := connPool.Get(ctx) if err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletenv/tabletenv.go b/go/vt/vttablet/tabletserver/tabletenv/tabletenv.go index d3945dafee4..d590b2e79ee 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/tabletenv.go +++ b/go/vt/vttablet/tabletserver/tabletenv/tabletenv.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/tb" "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" @@ -119,6 +120,32 @@ var ( Errorf = log.Errorf ) +// Env defines the functions supported by TabletServer +// that the sub-componennts need to access. +type Env interface { + CheckMySQL() + Config() *TabletConfig + DBConfigs() *dbconfigs.DBConfigs +} + +type testEnv struct { + config *TabletConfig + dbconfigs *dbconfigs.DBConfigs +} + +// NewTestEnv creates an Env that can be used for tests. +// CheckMySQL is a no-op. +func NewTestEnv(config *TabletConfig, dbconfigs *dbconfigs.DBConfigs) Env { + return &testEnv{ + config: config, + dbconfigs: dbconfigs, + } +} + +func (*testEnv) CheckMySQL() {} +func (te *testEnv) Config() *TabletConfig { return te.config } +func (te *testEnv) DBConfigs() *dbconfigs.DBConfigs { return te.dbconfigs } + // RecordUserQuery records the query data against the user. func RecordUserQuery(ctx context.Context, tableName sqlparser.TableIdent, queryType string, duration int64) { username := callerid.GetPrincipal(callerid.EffectiveCallerIDFromContext(ctx)) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index d2fe7079bb7..ff04d500bcf 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -47,6 +47,7 @@ import ( querypb "vitess.io/vitess/go/vt/proto/query" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/tableacl" @@ -133,6 +134,8 @@ func stateInfo(state int64) string { // Open and Close can be called repeatedly during the lifetime of // a subcomponent. These should also be idempotent. type TabletServer struct { + exporter *servenv.Exporter + config *tabletenv.TabletConfig QueryTimeout sync2.AtomicDuration TerseErrors bool enableHotRowProtection bool @@ -208,6 +211,7 @@ var srvTopoServer srvtopo.Server // instance of TabletServer will expose its state variables. func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, alias topodatapb.TabletAlias) *TabletServer { tsv := &TabletServer{ + config: &config, QueryTimeout: sync2.NewAtomicDuration(time.Duration(config.QueryTimeout * 1e9)), TerseErrors: config.TerseErrors, enableHotRowProtection: config.EnableHotRowProtection || config.EnableHotRowProtectionDryRun, @@ -217,15 +221,12 @@ func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, ali topoServer: topoServer, alias: alias, } - tsv.se = schema.NewEngine(tsv, config) - tsv.qe = NewQueryEngine(tsv, tsv.se, config) - tsv.te = NewTxEngine(tsv, config) - tsv.hw = heartbeat.NewWriter(tsv, alias, config) - tsv.hr = heartbeat.NewReader(tsv, config) + tsv.se = schema.NewEngine(tsv) + tsv.qe = NewQueryEngine(tsv, tsv.se) + tsv.te = NewTxEngine(tsv) + tsv.hw = heartbeat.NewWriter(tsv, alias) + tsv.hr = heartbeat.NewReader(tsv) tsv.txThrottler = txthrottler.CreateTxThrottlerFromTabletConfig(topoServer) - // FIXME(alainjobart) could we move this to the Register method below? - // So that vtcombo doesn't even call it once, on the first tablet. - // And we can remove the tsOnce variable. tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(topoServer, "TabletSrvTopo") stats.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { @@ -244,10 +245,9 @@ func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, ali stats.NewGaugeDurationFunc("QueryTimeout", "Tablet server query timeout", tsv.QueryTimeout.Get) stats.NewGaugeDurationFunc("QueryPoolTimeout", "Tablet server timeout to get a connection from the query pool", tsv.qe.connTimeout.Get) }) - // TODO(sougou): move this up once the stats naming problem is fixed. - tsv.vstreamer = vstreamer.NewEngine(srvTopoServer, tsv.se) + tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se) tsv.watcher = NewReplicationWatcher(tsv.vstreamer, config) - tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer, config) + tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) return tsv } @@ -263,6 +263,21 @@ func (tsv *TabletServer) Register() { tsv.registerTwopczHandler() } +// Exporter satisfies tabletenv.Env. +func (tsv *TabletServer) Exporter() *servenv.Exporter { + return tsv.exporter +} + +// Config satisfies tabletenv.Env. +func (tsv *TabletServer) Config() *tabletenv.TabletConfig { + return tsv.config +} + +// DBConfigs satisfies tabletenv.Env. +func (tsv *TabletServer) DBConfigs() *dbconfigs.DBConfigs { + return tsv.dbconfigs +} + // RegisterQueryRuleSource registers ruleSource for setting query rules. func (tsv *TabletServer) RegisterQueryRuleSource(ruleSource string) { tsv.qe.queryRuleSources.RegisterSource(ruleSource) @@ -330,11 +345,6 @@ func (tsv *TabletServer) InitDBConfig(target querypb.Target, dbcfgs *dbconfigs.D tsv.dbconfigs = dbcfgs tsv.se.InitDBConfig(tsv.dbconfigs.DbaWithDB()) - tsv.qe.InitDBConfig(tsv.dbconfigs) - tsv.te.InitDBConfig(tsv.dbconfigs) - tsv.hw.InitDBConfig(tsv.dbconfigs) - tsv.hr.InitDBConfig(tsv.dbconfigs) - tsv.vstreamer.InitDBConfig(tsv.dbconfigs.DbaWithDB()) return nil } @@ -649,6 +659,7 @@ func (tsv *TabletServer) IsHealthy() error { // CheckMySQL initiates a check to see if MySQL is reachable. // If not, it shuts down the query service. The check is rate-limited // to no more than once per second. +// The function satisfies tabletenv.Env. func (tsv *TabletServer) CheckMySQL() { if !tsv.checkMySQLThrottler.TryAcquire() { return diff --git a/go/vt/vttablet/tabletserver/tabletserver_flaky_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go similarity index 99% rename from go/vt/vttablet/tabletserver/tabletserver_flaky_test.go rename to go/vt/vttablet/tabletserver/tabletserver_test.go index defc06c5f35..dddc7af6289 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_flaky_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1439,7 +1439,10 @@ func TestDMLQueryWithoutWhereClause(t *testing.T) { db.AddQuery(q+" limit 10001", &sqltypes.Result{}) - _, _, err = tsv.BeginExecute(context.Background(), &target, q, nil, nil) + ctx := context.Background() + _, txid, err := tsv.BeginExecute(ctx, &target, q, nil, nil) + require.NoError(t, err) + err = tsv.Commit(ctx, &target, txid) require.NoError(t, err) } diff --git a/go/vt/vttablet/tabletserver/testutils_test.go b/go/vt/vttablet/tabletserver/testutils_test.go index 2a162d2ee70..e6116eb66ea 100644 --- a/go/vt/vttablet/tabletserver/testutils_test.go +++ b/go/vt/vttablet/tabletserver/testutils_test.go @@ -28,13 +28,6 @@ import ( var errRejected = errors.New("rejected") -type dummyChecker struct { -} - -func (dummyChecker) CheckMySQL() {} - -var DummyChecker = dummyChecker{} - type testUtils struct{} func newTestUtils() *testUtils { diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index e467b6b18fa..80dcf4d36aa 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -26,7 +26,6 @@ import ( "vitess.io/vitess/go/timer" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/concurrency" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/dtids" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -67,6 +66,7 @@ func (state txEngineState) String() string { // states. It will start and shut down the underlying tx-pool as required. // It does this in a concurrently safe way. type TxEngine struct { + env tabletenv.Env // the following four fields are interconnected. `state` and `nextState` should be protected by the // `stateLock` // @@ -84,8 +84,6 @@ type TxEngine struct { // transition while creating new transactions beginRequests sync.WaitGroup - dbconfigs *dbconfigs.DBConfigs - twopcEnabled bool shutdownGracePeriod time.Duration coordinatorAddress string @@ -98,32 +96,14 @@ type TxEngine struct { } // NewTxEngine creates a new TxEngine. -func NewTxEngine(checker connpool.MySQLChecker, config tabletenv.TabletConfig) *TxEngine { +func NewTxEngine(env tabletenv.Env) *TxEngine { + config := env.Config() te := &TxEngine{ + env: env, shutdownGracePeriod: time.Duration(config.TxShutDownGracePeriod * 1e9), } - limiter := txlimiter.New( - config.TransactionCap, - config.TransactionLimitPerUser, - config.EnableTransactionLimit, - config.EnableTransactionLimitDryRun, - config.TransactionLimitByUsername, - config.TransactionLimitByPrincipal, - config.TransactionLimitByComponent, - config.TransactionLimitBySubcomponent, - ) - te.txPool = NewTxPool( - config.PoolNamePrefix, - config.TransactionCap, - config.FoundRowsPoolSize, - config.TxPoolPrefillParallelism, - time.Duration(config.TransactionTimeout*1e9), - time.Duration(config.TxPoolTimeout*1e9), - time.Duration(config.IdleTimeout*1e9), - config.TxPoolWaiterCap, - checker, - limiter, - ) + limiter := txlimiter.New(env) + te.txPool = NewTxPool(env, limiter) te.twopcEnabled = config.TwoPCEnable if te.twopcEnabled { if config.TwoPCCoordinatorAddress == "" { @@ -145,13 +125,7 @@ func NewTxEngine(checker connpool.MySQLChecker, config tabletenv.TabletConfig) * // the system can deadlock if all connections get moved to // the TxPreparedPool. te.preparedPool = NewTxPreparedPool(config.TransactionCap - 2) - readPool := connpool.New( - config.PoolNamePrefix+"TxReadPool", - 3, - 0, - time.Duration(config.IdleTimeout*1e9), - checker, - ) + readPool := connpool.New(env, config.PoolNamePrefix+"TxReadPool", 3, 0, time.Duration(config.IdleTimeout*1e9)) te.twoPC = NewTwoPC(readPool) te.transitionSignal = make(chan struct{}) // By immediately closing this channel, all state changes can simply be made blocking by issuing the @@ -359,16 +333,11 @@ func (te *TxEngine) transitionTo(nextState txEngineState) error { return nil } -// InitDBConfig must be called before Init. -func (te *TxEngine) InitDBConfig(dbcfgs *dbconfigs.DBConfigs) { - te.dbconfigs = dbcfgs -} - // Init must be called once when vttablet starts for setting // up the metadata tables. func (te *TxEngine) Init() error { if te.twopcEnabled { - return te.twoPC.Init(te.dbconfigs.SidecarDBName.Get(), te.dbconfigs.DbaWithDB()) + return te.twoPC.Init(te.env.DBConfigs().SidecarDBName.Get(), te.env.DBConfigs().DbaWithDB()) } return nil } @@ -377,10 +346,10 @@ func (te *TxEngine) Init() error { // all previously prepared transactions from the redo log. // this should only be called when the state is already locked func (te *TxEngine) open() { - te.txPool.Open(te.dbconfigs.AppWithDB(), te.dbconfigs.DbaWithDB(), te.dbconfigs.AppDebugWithDB()) + te.txPool.Open(te.env.DBConfigs().AppWithDB(), te.env.DBConfigs().DbaWithDB(), te.env.DBConfigs().AppDebugWithDB()) if te.twopcEnabled && te.state == AcceptingReadAndWrite { - te.twoPC.Open(te.dbconfigs) + te.twoPC.Open(te.env.DBConfigs()) if err := te.prepareFromRedo(); err != nil { // If this operation fails, we choose to raise an alert and // continue anyway. Serving traffic is considered more important diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index 710d1c9f5d6..da2e0468246 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -42,8 +42,7 @@ func TestTxEngineClose(t *testing.T) { config.TransactionCap = 10 config.TransactionTimeout = 0.5 config.TxShutDownGracePeriod = 0 - te := NewTxEngine(nil, config) - te.InitDBConfig(dbcfgs) + te := NewTxEngine(tabletenv.NewTestEnv(&config, dbcfgs)) // Normal close. te.open() @@ -467,8 +466,7 @@ func setupTxEngine(db *fakesqldb.DB) *TxEngine { config.TransactionCap = 10 config.TransactionTimeout = 0.5 config.TxShutDownGracePeriod = 0 - te := NewTxEngine(nil, config) - te.InitDBConfig(dbcfgs) + te := NewTxEngine(tabletenv.NewTestEnv(&config, dbcfgs)) return te } diff --git a/go/vt/vttablet/tabletserver/tx_pool.go b/go/vt/vttablet/tabletserver/tx_pool.go index f5312e76b35..d1045fcf420 100644 --- a/go/vt/vttablet/tabletserver/tx_pool.go +++ b/go/vt/vttablet/tabletserver/tx_pool.go @@ -76,6 +76,7 @@ var ( // TxPool is the transaction pool for the query service. type TxPool struct { + env tabletenv.Env // conns is the 'regular' pool. By default, connections // are pulled from here for starting transactions. conns *connpool.Pool @@ -89,7 +90,6 @@ type TxPool struct { transactionTimeout sync2.AtomicDuration transactionPoolTimeout sync2.AtomicDuration ticks *timer.Timer - checker connpool.MySQLChecker limiter txlimiter.TxLimiter // Tracking culprits that cause tx pool full errors. logMu sync.Mutex @@ -99,28 +99,21 @@ type TxPool struct { } // NewTxPool creates a new TxPool. It's not operational until it's Open'd. -func NewTxPool( - prefix string, - capacity int, - foundRowsCapacity int, - prefillParallelism int, - transactionTimeout time.Duration, - transactionPoolTimeout time.Duration, - idleTimeout time.Duration, - waiterCap int, - checker connpool.MySQLChecker, - limiter txlimiter.TxLimiter) *TxPool { +func NewTxPool(env tabletenv.Env, limiter txlimiter.TxLimiter) *TxPool { + config := env.Config() + prefix := config.PoolNamePrefix + transactionTimeout := time.Duration(config.TransactionTimeout * 1e9) axp := &TxPool{ - conns: connpool.New(prefix+"TransactionPool", capacity, prefillParallelism, idleTimeout, checker), - foundRowsPool: connpool.New(prefix+"FoundRowsPool", foundRowsCapacity, prefillParallelism, idleTimeout, checker), + env: env, + conns: connpool.New(env, prefix+"TransactionPool", config.TransactionCap, config.TxPoolPrefillParallelism, time.Duration(config.IdleTimeout*1e9)), + foundRowsPool: connpool.New(env, prefix+"FoundRowsPool", config.FoundRowsPoolSize, config.TxPoolPrefillParallelism, time.Duration(config.IdleTimeout*1e9)), activePool: pools.NewNumbered(), lastID: sync2.NewAtomicInt64(time.Now().UnixNano()), transactionTimeout: sync2.NewAtomicDuration(transactionTimeout), - transactionPoolTimeout: sync2.NewAtomicDuration(transactionPoolTimeout), - waiterCap: sync2.NewAtomicInt64(int64(waiterCap)), + transactionPoolTimeout: sync2.NewAtomicDuration(time.Duration(config.TxPoolTimeout * 1e9)), + waiterCap: sync2.NewAtomicInt64(int64(config.TxPoolWaiterCap)), waiters: sync2.NewAtomicInt64(0), ticks: timer.NewTimer(transactionTimeout / 10), - checker: checker, limiter: limiter, } txOnce.Do(func() { @@ -463,7 +456,7 @@ func (txc *TxConnection) Exec(ctx context.Context, query string, maxrows int, wa // If the context is done, the query was killed. // So, don't trigger a mysql check. default: - txc.pool.checker.CheckMySQL() + txc.pool.env.CheckMySQL() } } return nil, err diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index 9912896cd7d..5052967aded 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -697,24 +697,14 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) { } func newTxPool() *TxPool { + config := tabletenv.DefaultQsConfig randID := rand.Int63() - poolName := fmt.Sprintf("TestTransactionPool-%d", randID) - transactionCap := 300 - transactionTimeout := time.Duration(30 * time.Second) - transactionPoolTimeout := time.Duration(40 * time.Second) - waiterCap := 500000 - idleTimeout := time.Duration(30 * time.Second) + config.PoolNamePrefix = fmt.Sprintf("TestTransactionPool-%d", randID) + config.TransactionCap = 300 + config.TransactionTimeout = 30 + config.TxPoolTimeout = 40 + config.TxPoolWaiterCap = 500000 + config.IdleTimeout = 30 limiter := &txlimiter.TxAllowAll{} - return NewTxPool( - poolName, - transactionCap, - transactionCap, - 0, - transactionTimeout, - transactionPoolTimeout, - idleTimeout, - waiterCap, - DummyChecker, - limiter, - ) + return NewTxPool(tabletenv.NewTestEnv(&config, nil), limiter) } diff --git a/go/vt/vttablet/tabletserver/txlimiter/tx_limiter.go b/go/vt/vttablet/tabletserver/txlimiter/tx_limiter.go index a8fe9b64f02..06f2cb8da76 100644 --- a/go/vt/vttablet/tabletserver/txlimiter/tx_limiter.go +++ b/go/vt/vttablet/tabletserver/txlimiter/tx_limiter.go @@ -23,6 +23,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/callerid" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -50,19 +51,20 @@ type TxLimiter interface { // byXXX: whether given field from immediate/effective caller id should be taken // into account when deciding "user" identity for purposes of transaction // limiting. -func New(slotCount int, maxPerUser float64, enabled, dryRun, byUsername, byPrincipal, byComponent, bySubcomponent bool) TxLimiter { - if !enabled && !dryRun { +func New(env tabletenv.Env) TxLimiter { + config := env.Config() + if !config.EnableTransactionLimit && !config.EnableTransactionLimitDryRun { return &TxAllowAll{} } return &Impl{ - maxPerUser: int64(float64(slotCount) * maxPerUser), - dryRun: dryRun, - byUsername: byUsername, - byPrincipal: byPrincipal, - byComponent: byComponent, - bySubcomponent: bySubcomponent, - byEffectiveUser: byPrincipal || byComponent || bySubcomponent, + maxPerUser: int64(float64(config.TransactionCap) * config.TransactionLimitPerUser), + dryRun: config.EnableTransactionLimitDryRun, + byUsername: config.TransactionLimitByUsername, + byPrincipal: config.TransactionLimitByPrincipal, + byComponent: config.TransactionLimitByComponent, + bySubcomponent: config.TransactionLimitBySubcomponent, + byEffectiveUser: config.TransactionLimitByPrincipal || config.TransactionLimitByComponent || config.TransactionLimitBySubcomponent, usageMap: make(map[string]int64), } } diff --git a/go/vt/vttablet/tabletserver/txlimiter/tx_limiter_test.go b/go/vt/vttablet/tabletserver/txlimiter/tx_limiter_test.go index 09616fe8948..b9ea7af1397 100644 --- a/go/vt/vttablet/tabletserver/txlimiter/tx_limiter_test.go +++ b/go/vt/vttablet/tabletserver/txlimiter/tx_limiter_test.go @@ -20,6 +20,7 @@ import ( "testing" "vitess.io/vitess/go/vt/callerid" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -37,7 +38,16 @@ func createCallers(username, principal, component, subcomponent string) (*queryp } func TestTxLimiter_DisabledAllowsAll(t *testing.T) { - limiter := New(10, 0.1, false, false, false, false, false, false) + config := tabletenv.DefaultQsConfig + config.TransactionCap = 10 + config.TransactionLimitPerUser = 0.1 + config.EnableTransactionLimit = false + config.EnableTransactionLimitDryRun = false + config.TransactionLimitByUsername = false + config.TransactionLimitByPrincipal = false + config.TransactionLimitByComponent = false + config.TransactionLimitBySubcomponent = false + limiter := New(tabletenv.NewTestEnv(&config, nil)) im, ef := createCallers("", "", "", "") for i := 0; i < 5; i++ { if got, want := limiter.Get(im, ef), true; got != want { @@ -50,8 +60,18 @@ func TestTxLimiter_DisabledAllowsAll(t *testing.T) { func TestTxLimiter_LimitsOnlyOffendingUser(t *testing.T) { resetVariables() + config := tabletenv.DefaultQsConfig + config.TransactionCap = 10 + config.TransactionLimitPerUser = 0.3 + config.EnableTransactionLimit = true + config.EnableTransactionLimitDryRun = false + config.TransactionLimitByUsername = true + config.TransactionLimitByPrincipal = false + config.TransactionLimitByComponent = false + config.TransactionLimitBySubcomponent = false + // This should allow 3 slots to all users - newlimiter := New(10, 0.3, true, false, true, false, false, false) + newlimiter := New(tabletenv.NewTestEnv(&config, nil)) limiter, ok := newlimiter.(*Impl) if !ok { t.Fatalf("New returned limiter of unexpected type: got %T, want %T", newlimiter, limiter) @@ -107,8 +127,18 @@ func TestTxLimiter_LimitsOnlyOffendingUser(t *testing.T) { func TestTxLimiterDryRun(t *testing.T) { resetVariables() + config := tabletenv.DefaultQsConfig + config.TransactionCap = 10 + config.TransactionLimitPerUser = 0.3 + config.EnableTransactionLimit = true + config.EnableTransactionLimitDryRun = true + config.TransactionLimitByUsername = true + config.TransactionLimitByPrincipal = false + config.TransactionLimitByComponent = false + config.TransactionLimitBySubcomponent = false + // This should allow 3 slots to all users - newlimiter := New(10, 0.3, true, true, true, false, false, false) + newlimiter := New(tabletenv.NewTestEnv(&config, nil)) limiter, ok := newlimiter.(*Impl) if !ok { t.Fatalf("New returned limiter of unexpected type: got %T, want %T", newlimiter, limiter) diff --git a/go/vt/vttablet/tabletserver/vstreamer/engine.go b/go/vt/vttablet/tabletserver/vstreamer/engine.go index 5afe7558cc7..e1005f4a3be 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/engine.go +++ b/go/vt/vttablet/tabletserver/vstreamer/engine.go @@ -27,12 +27,12 @@ import ( "vitess.io/vitess/go/acl" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" vschemapb "vitess.io/vitess/go/vt/proto/vschema" @@ -46,8 +46,7 @@ var ( // Engine is the engine for handling vreplication streaming requests. type Engine struct { - // cp is initialized by InitDBConfig - cp dbconfigs.Connector + env tabletenv.Env // mu protects isOpen, streamers, streamIdx and vschema. mu sync.Mutex @@ -78,8 +77,9 @@ type Engine struct { // NewEngine creates a new Engine. // Initialization sequence is: NewEngine->InitDBConfig->Open. // Open and Close can be called multiple times and are idempotent. -func NewEngine(ts srvtopo.Server, se *schema.Engine) *Engine { +func NewEngine(env tabletenv.Env, ts srvtopo.Server, se *schema.Engine) *Engine { vse := &Engine{ + env: env, streamers: make(map[int]*vstreamer), rowStreamers: make(map[int]*rowStreamer), resultStreamers: make(map[int]*resultStreamer), @@ -95,11 +95,6 @@ func NewEngine(ts srvtopo.Server, se *schema.Engine) *Engine { return vse } -// InitDBConfig performs saves the required info from dbconfigs for future use. -func (vse *Engine) InitDBConfig(cp dbconfigs.Connector) { - vse.cp = cp -} - // Open starts the Engine service. func (vse *Engine) Open(keyspace, cell string) error { vse.mu.Lock() @@ -166,7 +161,7 @@ func (vse *Engine) Stream(ctx context.Context, startPos string, filter *binlogda if !vse.isOpen { return nil, 0, errors.New("VStreamer is not open") } - streamer := newVStreamer(ctx, vse.cp, vse.se, startPos, filter, vse.lvschema, send) + streamer := newVStreamer(ctx, vse.env.DBConfigs().DbaWithDB(), vse.se, startPos, filter, vse.lvschema, send) idx := vse.streamIdx vse.streamers[idx] = streamer vse.streamIdx++ @@ -206,7 +201,7 @@ func (vse *Engine) StreamRows(ctx context.Context, query string, lastpk []sqltyp if !vse.isOpen { return nil, 0, errors.New("VStreamer is not open") } - rowStreamer := newRowStreamer(ctx, vse.cp, vse.se, query, lastpk, vse.lvschema, send) + rowStreamer := newRowStreamer(ctx, vse.env.DBConfigs().AppWithDB(), vse.se, query, lastpk, vse.lvschema, send) idx := vse.streamIdx vse.rowStreamers[idx] = rowStreamer vse.streamIdx++ @@ -240,7 +235,7 @@ func (vse *Engine) StreamResults(ctx context.Context, query string, send func(*b if !vse.isOpen { return nil, 0, errors.New("VStreamer is not open") } - resultStreamer := newResultStreamer(ctx, vse.cp, query, send) + resultStreamer := newResultStreamer(ctx, vse.env.DBConfigs().AppWithDB(), query, send) idx := vse.streamIdx vse.resultStreamers[idx] = resultStreamer vse.streamIdx++ diff --git a/go/vt/vttablet/tabletserver/vstreamer/main_test.go b/go/vt/vttablet/tabletserver/vstreamer/main_test.go index cda8ad22251..ff75bd0767d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/main_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/main_test.go @@ -22,6 +22,10 @@ import ( "os" "testing" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer/testenv" ) @@ -48,8 +52,7 @@ func TestMain(m *testing.M) { // engine cannot be initialized in testenv because it introduces // circular dependencies. - engine = NewEngine(env.SrvTopo, env.SchemaEngine) - engine.InitDBConfig(env.Dbcfgs.DbaWithDB()) + engine = NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine) engine.Open(env.KeyspaceName, env.Cells[0]) defer engine.Close() @@ -57,3 +60,13 @@ func TestMain(m *testing.M) { }() os.Exit(exitCode) } + +func customEngine(t *testing.T, modifier func(mysql.ConnParams) mysql.ConnParams) *Engine { + original, err := env.Dbcfgs.AppWithDB().MysqlParams() + require.NoError(t, err) + modified := modifier(*original) + dbcfgs := dbconfigs.NewTestDBConfigs(modified, modified, modified.DbName) + engine := NewEngine(tabletenv.NewTestEnv(env.TabletEnv.Config(), dbcfgs), env.SrvTopo, env.SchemaEngine) + engine.Open(env.KeyspaceName, env.Cells[0]) + return engine +} diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index 01056c893e3..36e7dfcbf11 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -139,7 +140,15 @@ func TestStreamRowsUnicode(t *testing.T) { defer execStatements(t, []string{ "drop table t1", }) - engine.se.Reload(context.Background()) + + // Use an engine with latin1 charset. + savedEngine := engine + defer func() { engine = savedEngine }() + engine = customEngine(t, func(in mysql.ConnParams) mysql.ConnParams { + in.Charset = "latin1" + return in + }) + defer engine.Close() // We need a latin1 connection. conn, err := env.Mysqld.GetDbaConnection() @@ -156,14 +165,6 @@ func TestStreamRowsUnicode(t *testing.T) { t.Fatal(err) } - savecp := engine.cp - // Rowstreamer must override this to "binary" - params, err := engine.cp.MysqlParams() - if err != nil { - t.Fatal(err) - } - params.Charset = "latin1" - defer func() { engine.cp = savecp }() err = engine.StreamRows(context.Background(), "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error { // Skip fields. if len(rows.Rows) == 0 { diff --git a/go/vt/vttablet/tabletserver/vstreamer/snapshot_conn_test.go b/go/vt/vttablet/tabletserver/vstreamer/snapshot_conn_test.go index 8824687b161..8d0f5955285 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/snapshot_conn_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/snapshot_conn_test.go @@ -39,7 +39,7 @@ func TestStartSnapshot(t *testing.T) { }) ctx := context.Background() - conn, err := snapshotConnect(ctx, engine.cp) + conn, err := snapshotConnect(ctx, env.TabletEnv.DBConfigs().AppWithDB()) require.NoError(t, err) defer conn.Close() diff --git a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go index 607ec3cc2a3..a556ed0d819 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go +++ b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go @@ -28,7 +28,6 @@ import ( "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttest" @@ -46,6 +45,7 @@ type Env struct { ShardName string Cells []string + TabletEnv tabletenv.Env TopoServ *topo.Server SrvTopo srvtopo.Server Dbcfgs *dbconfigs.DBConfigs @@ -53,12 +53,6 @@ type Env struct { SchemaEngine *schema.Engine } -type checker struct{} - -var _ = connpool.MySQLChecker(checker{}) - -func (checker) CheckMySQL() {} - // Init initializes an Env. func Init() (*Env, error) { te := &Env{ @@ -102,8 +96,10 @@ func Init() (*Env, error) { } te.Dbcfgs = dbconfigs.NewTestDBConfigs(te.cluster.MySQLConnParams(), te.cluster.MySQLAppDebugConnParams(), te.cluster.DbName()) + config := tabletenv.DefaultQsConfig + te.TabletEnv = tabletenv.NewTestEnv(&config, te.Dbcfgs) te.Mysqld = mysqlctl.NewMysqld(te.Dbcfgs) - te.SchemaEngine = schema.NewEngine(checker{}, tabletenv.DefaultQsConfig) + te.SchemaEngine = schema.NewEngine(te.TabletEnv) te.SchemaEngine.InitDBConfig(te.Dbcfgs.DbaWithDB()) // The first vschema should not be empty. Leads to Node not found error. diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 98cb8e2ddc1..bdcc805404e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -223,12 +223,13 @@ func TestStatements(t *testing.T) { runCases(t, nil, testcases, "current") // Test FilePos flavor - params, err := engine.cp.MysqlParams() - if err != nil { - t.Fatal(err) - } - params.Flavor = "FilePos" - defer func() { params.Flavor = "" }() + savedEngine := engine + defer func() { engine = savedEngine }() + engine = customEngine(t, func(in mysql.ConnParams) mysql.ConnParams { + in.Flavor = "FilePos" + return in + }) + defer engine.Close() runCases(t, nil, testcases, "current") } @@ -293,13 +294,13 @@ func TestOther(t *testing.T) { customRun("gtid") // Test FilePos flavor - params, err := engine.cp.MysqlParams() - if err != nil { - t.Fatal(err) - } - params.Flavor = "FilePos" - - defer func() { params.Flavor = "" }() + savedEngine := engine + defer func() { engine = savedEngine }() + engine = customEngine(t, func(in mysql.ConnParams) mysql.ConnParams { + in.Flavor = "FilePos" + return in + }) + defer engine.Close() customRun("filePos") } @@ -1424,7 +1425,7 @@ func masterPosition(t *testing.T) string { // We use the engine's cp because there is one test that overrides // the flavor to FilePos. If so, we have to obtain the position // in that flavor format. - connParam, err := engine.cp.MysqlParams() + connParam, err := engine.env.DBConfigs().DbaWithDB().MysqlParams() if err != nil { t.Fatal(err) }