Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 7 additions & 11 deletions go/vt/vttablet/heartbeat/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
Expand All @@ -49,7 +48,7 @@ const (
// table against the current time at read time. This value is reported in metrics and
// also to the healthchecks.
type Reader struct {
dbconfigs *dbconfigs.DBConfigs
env tabletenv.Env

enabled bool
interval time.Duration
Expand All @@ -69,33 +68,30 @@ type Reader struct {
}

// NewReader returns a new heartbeat reader.
func NewReader(checker connpool.MySQLChecker, config tabletenv.TabletConfig) *Reader {
func NewReader(env tabletenv.Env) *Reader {
config := env.Config()
if !config.HeartbeatEnable {
return &Reader{}
}

return &Reader{
env: env,
enabled: true,
now: time.Now,
interval: config.HeartbeatInterval,
ticks: timer.NewTimer(config.HeartbeatInterval),
errorLog: logutil.NewThrottledLogger("HeartbeatReporter", 60*time.Second),
pool: connpool.New(config.PoolNamePrefix+"HeartbeatReadPool", 1, 0, time.Duration(config.IdleTimeout*1e9), checker),
pool: connpool.New(env, config.PoolNamePrefix+"HeartbeatReadPool", 1, 0, time.Duration(config.IdleTimeout*1e9)),
}
}

// InitDBConfig must be called before Init.
func (r *Reader) InitDBConfig(dbcfgs *dbconfigs.DBConfigs) {
r.dbconfigs = dbcfgs
}

// Init does last minute initialization of db settings, such as dbName
// and keyspaceShard
func (r *Reader) Init(target querypb.Target) {
if !r.enabled {
return
}
r.dbName = sqlescape.EscapeID(r.dbconfigs.SidecarDBName.Get())
r.dbName = sqlescape.EscapeID(r.env.DBConfigs().SidecarDBName.Get())
r.keyspaceShard = fmt.Sprintf("%s:%s", target.Keyspace, target.Shard)
}

Expand All @@ -112,7 +108,7 @@ func (r *Reader) Open() {
}

log.Info("Beginning heartbeat reads")
r.pool.Open(r.dbconfigs.AppWithDB(), r.dbconfigs.DbaWithDB(), r.dbconfigs.AppDebugWithDB())
r.pool.Open(r.env.DBConfigs().AppWithDB(), r.env.DBConfigs().DbaWithDB(), r.env.DBConfigs().AppDebugWithDB())
r.ticks.Start(func() { r.readHeartbeat() })
r.isOpen = true
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/heartbeat/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newReader(db *fakesqldb.DB, nowFunc func() time.Time) *Reader {
cp := *params
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")

tr := NewReader(&fakeMysqlChecker{}, config)
tr := NewReader(tabletenv.NewTestEnv(&config, nil))
tr.dbName = sqlescape.EscapeID(dbc.SidecarDBName.Get())
tr.keyspaceShard = "test:0"
tr.now = nowFunc
Expand Down
19 changes: 8 additions & 11 deletions go/vt/vttablet/heartbeat/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
// Writer runs on master tablets and writes heartbeats to the _vt.heartbeat
// table at a regular interval, defined by heartbeat_interval.
type Writer struct {
dbconfigs *dbconfigs.DBConfigs
env tabletenv.Env

enabled bool
interval time.Duration
Expand All @@ -73,26 +73,23 @@ type Writer struct {
}

// NewWriter creates a new Writer.
func NewWriter(checker connpool.MySQLChecker, alias topodatapb.TabletAlias, config tabletenv.TabletConfig) *Writer {
func NewWriter(env tabletenv.Env, alias topodatapb.TabletAlias) *Writer {
config := env.Config()
if !config.HeartbeatEnable {
return &Writer{}
}
return &Writer{
env: env,
enabled: true,
tabletAlias: alias,
now: time.Now,
interval: config.HeartbeatInterval,
ticks: timer.NewTimer(config.HeartbeatInterval),
errorLog: logutil.NewThrottledLogger("HeartbeatWriter", 60*time.Second),
pool: connpool.New(config.PoolNamePrefix+"HeartbeatWritePool", 1, 0, time.Duration(config.IdleTimeout*1e9), checker),
pool: connpool.New(env, config.PoolNamePrefix+"HeartbeatWritePool", 1, 0, time.Duration(config.IdleTimeout*1e9)),
}
}

// InitDBConfig must be called before Init.
func (w *Writer) InitDBConfig(dbcfgs *dbconfigs.DBConfigs) {
w.dbconfigs = dbcfgs
}

// Init runs at tablet startup and last minute initialization of db settings, and
// creates the necessary tables for heartbeat.
func (w *Writer) Init(target querypb.Target) error {
Expand All @@ -102,10 +99,10 @@ func (w *Writer) Init(target querypb.Target) error {
w.mu.Lock()
defer w.mu.Unlock()
log.Info("Initializing heartbeat table.")
w.dbName = sqlescape.EscapeID(w.dbconfigs.SidecarDBName.Get())
w.dbName = sqlescape.EscapeID(w.env.DBConfigs().SidecarDBName.Get())
w.keyspaceShard = fmt.Sprintf("%s:%s", target.Keyspace, target.Shard)

err := w.initializeTables(w.dbconfigs.DbaWithDB())
err := w.initializeTables(w.env.DBConfigs().DbaWithDB())
if err != nil {
w.recordError(err)
return err
Expand All @@ -128,7 +125,7 @@ func (w *Writer) Open() {
return
}
log.Info("Beginning heartbeat writes")
w.pool.Open(w.dbconfigs.AppWithDB(), w.dbconfigs.DbaWithDB(), w.dbconfigs.AppDebugWithDB())
w.pool.Open(w.env.DBConfigs().AppWithDB(), w.env.DBConfigs().DbaWithDB(), w.env.DBConfigs().AppDebugWithDB())
w.ticks.Start(func() { w.writeHeartbeat() })
w.isOpen = true
}
Expand Down
8 changes: 1 addition & 7 deletions go/vt/vttablet/heartbeat/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,11 @@ func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *Writer {
cp := *params
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")

tw := NewWriter(&fakeMysqlChecker{},
topodatapb.TabletAlias{Cell: "test", Uid: 1111},
config)
tw := NewWriter(tabletenv.NewTestEnv(&config, nil), topodatapb.TabletAlias{Cell: "test", Uid: 1111})
tw.dbName = sqlescape.EscapeID(dbc.SidecarDBName.Get())
tw.keyspaceShard = "test:0"
tw.now = nowFunc
tw.pool.Open(dbc.AppWithDB(), dbc.DbaWithDB(), dbc.AppDebugWithDB())

return tw
}

type fakeMysqlChecker struct{}

func (f fakeMysqlChecker) CheckMySQL() {}
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ func TestMain(m *testing.M) {

// engines cannot be initialized in testenv because it introduces
// circular dependencies.
streamerEngine = vstreamer.NewEngine(env.SrvTopo, env.SchemaEngine)
streamerEngine.InitDBConfig(env.Dbcfgs.DbaWithDB())
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine)
streamerEngine.Open(env.KeyspaceName, env.Cells[0])
defer streamerEngine.Close()

Expand Down
10 changes: 2 additions & 8 deletions go/vt/vttablet/tabletmanager/vreplication/vstreamer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
Expand Down Expand Up @@ -158,7 +157,8 @@ func (vsClient *MySQLVStreamerClient) Open(ctx context.Context) (err error) {

// Let's create all the required components by vstreamer

vsClient.sourceSe = schema.NewEngine(checker{}, tabletenv.DefaultQsConfig)
config := tabletenv.DefaultQsConfig
vsClient.sourceSe = schema.NewEngine(tabletenv.NewTestEnv(&config, nil))
vsClient.sourceSe.InitDBConfig(vsClient.sourceConnParams)
err = vsClient.sourceSe.Open()
if err != nil {
Expand Down Expand Up @@ -210,9 +210,3 @@ func (vsClient *MySQLVStreamerClient) VStreamRows(ctx context.Context, query str
func InitVStreamerClient(cfg *dbconfigs.DBConfigs) {
dbcfgs = cfg
}

type checker struct{}

var _ = connpool.MySQLChecker(checker{})

func (checker) CheckMySQL() {}
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewDBConn(
appParams dbconfigs.Connector) (*DBConn, error) {
c, err := dbconnpool.NewDBConnection(appParams, tabletenv.MySQLStats)
if err != nil {
cp.checker.CheckMySQL()
cp.env.CheckMySQL()
return nil, err
}
return &DBConn{
Expand Down Expand Up @@ -120,7 +120,7 @@ func (dbc *DBConn) Exec(ctx context.Context, query string, maxrows int, wantfiel
}

if reconnectErr := dbc.reconnect(); reconnectErr != nil {
dbc.pool.checker.CheckMySQL()
dbc.pool.env.CheckMySQL()
// Return the error of the reconnect and not the original connection error.
return nil, reconnectErr
}
Expand Down Expand Up @@ -201,7 +201,7 @@ func (dbc *DBConn) Stream(ctx context.Context, query string, callback func(*sqlt
default:
}
if reconnectErr := dbc.reconnect(); reconnectErr != nil {
dbc.pool.checker.CheckMySQL()
dbc.pool.env.CheckMySQL()
// Return the error of the reconnect and not the original connection error.
return reconnectErr
}
Expand Down
17 changes: 3 additions & 14 deletions go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,45 +45,34 @@ var ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "internal error: une
// through non-test code.
var usedNames = make(map[string]bool)

// MySQLChecker defines the CheckMySQL interface that lower
// level objects can use to call back into TabletServer.
type MySQLChecker interface {
CheckMySQL()
}

// Pool implements a custom connection pool for tabletserver.
// It's similar to dbconnpool.ConnPool, but the connections it creates
// come with built-in ability to kill in-flight queries. These connections
// also trigger a CheckMySQL call if we fail to connect to MySQL.
// Other than the connection type, ConnPool maintains an additional
// pool of dba connections that are used to kill connections.
type Pool struct {
env tabletenv.Env
name string
mu sync.Mutex
connections *pools.ResourcePool
capacity int
prefillParallelism int
idleTimeout time.Duration
dbaPool *dbconnpool.ConnectionPool
checker MySQLChecker
appDebugParams dbconfigs.Connector
}

// New creates a new Pool. The name is used
// to publish stats only.
func New(
name string,
capacity int,
prefillParallelism int,
idleTimeout time.Duration,
checker MySQLChecker) *Pool {
func New(env tabletenv.Env, name string, capacity int, prefillParallelism int, idleTimeout time.Duration) *Pool {
cp := &Pool{
env: env,
name: name,
capacity: capacity,
prefillParallelism: prefillParallelism,
idleTimeout: idleTimeout,
dbaPool: dbconnpool.NewConnectionPool("", 1, idleTimeout, 0),
checker: checker,
}
if name == "" || usedNames[name] {
return cp
Expand Down
10 changes: 2 additions & 8 deletions go/vt/vttablet/tabletserver/connpool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

"golang.org/x/net/context"
)
Expand Down Expand Up @@ -220,19 +221,12 @@ func TestConnPoolStateWhilePoolIsOpen(t *testing.T) {
}
}

type dummyChecker struct {
}

func (dummyChecker) CheckMySQL() {}

var checker = dummyChecker{}

func newPool() *Pool {
return New(
tabletenv.NewTestEnv(nil, nil),
fmt.Sprintf("TestPool%d", rand.Int63()),
100,
0,
10*time.Second,
checker,
)
}
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/messager/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
// TabletService defines the functions of TabletServer
// that the messager needs for callback.
type TabletService interface {
tabletenv.Env
PostponeMessages(ctx context.Context, target *querypb.Target, name string, ids []string) (count int64, err error)
PurgeMessages(ctx context.Context, target *querypb.Target, name string, timeCutoff int64) (count int64, err error)
}
Expand All @@ -60,12 +61,12 @@ type Engine struct {
}

// NewEngine creates a new Engine.
func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer, config tabletenv.TabletConfig) *Engine {
func NewEngine(tsv TabletService, se *schema.Engine, vs VStreamer) *Engine {
return &Engine{
tsv: tsv,
se: se,
vs: vs,
postponeSema: sync2.NewSemaphore(config.MessagePostponeCap, 0),
postponeSema: sync2.NewSemaphore(tsv.Config().MessagePostponeCap, 0),
managers: make(map[string]*messageManager),
}
}
Expand Down
8 changes: 5 additions & 3 deletions go/vt/vttablet/tabletserver/messager/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,9 +175,11 @@ func newTestEngine(db *fakesqldb.DB) *Engine {
randID := rand.Int63()
config := tabletenv.DefaultQsConfig
config.PoolNamePrefix = fmt.Sprintf("Pool-%d-", randID)
tsv := newFakeTabletServer()
se := schema.NewEngine(tsv, config)
te := NewEngine(tsv, se, newFakeVStreamer(), config)
tsv := &fakeTabletServer{
Env: tabletenv.NewTestEnv(&config, nil),
}
se := schema.NewEngine(tsv)
te := NewEngine(tsv, se, newFakeVStreamer())
te.Open()
return te
}
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletserver/messager/message_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -810,14 +811,20 @@ func TestMMGenerateWithBackoff(t *testing.T) {
}

type fakeTabletServer struct {
tabletenv.Env
postponeCount sync2.AtomicInt64
purgeCount sync2.AtomicInt64

mu sync.Mutex
ch chan string
}

func newFakeTabletServer() *fakeTabletServer { return &fakeTabletServer{} }
func newFakeTabletServer() *fakeTabletServer {
config := tabletenv.DefaultQsConfig
return &fakeTabletServer{
Env: tabletenv.NewTestEnv(&config, nil),
}
}

func (fts *fakeTabletServer) CheckMySQL() {}

Expand Down
Loading