Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions go/mysql/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,13 +761,7 @@ func (c *Conn) handleNextCommand(handler Handler) error {
data, err := c.readEphemeralPacket()
if err != nil {
// Don't log EOF errors. They cause too much spam.
// Note the EOF detection is not 100%
// guaranteed, in the case where the client
// connection is already closed before we call
// 'readEphemeralPacket'. This is a corner
// case though, and very unlikely to happen,
// and the only downside is we log a bit more then.
if err != io.EOF {
if err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
log.Errorf("Error reading packet from %s: %v", c, err)
}
return err
Expand Down
6 changes: 6 additions & 0 deletions go/vt/dbconfigs/dbconfigs.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/yaml2"
)

Expand Down Expand Up @@ -190,6 +192,10 @@ func (c Connector) Connect(ctx context.Context) (*mysql.Conn, error) {

// MysqlParams returns the connections params
func (c Connector) MysqlParams() (*mysql.ConnParams, error) {
if c.connParams == nil {
// This is only possible during tests.
return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "parameters are empty")
}
params, err := withCredentials(c.connParams)
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func TestSchemaVersioningLongDDL(t *testing.T) {
}

func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*binlogdatapb.VEvent) {
t.Helper()
client := framework.NewClient()

for _, test := range tests {
Expand Down
7 changes: 2 additions & 5 deletions go/vt/vttablet/heartbeat/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,8 @@ func NewReader(env tabletenv.Env) *Reader {
}
}

// Init does last minute initialization of db settings, such as keyspaceShard.
func (r *Reader) Init(target querypb.Target) {
if !r.enabled {
return
}
// InitDBConfig initializes the target name for the Reader.
func (r *Reader) InitDBConfig(target querypb.Target) {
r.keyspaceShard = fmt.Sprintf("%s:%s", target.Keyspace, target.Shard)
}

Expand Down
87 changes: 21 additions & 66 deletions go/vt/vttablet/heartbeat/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,12 @@ import (
"sync"
"time"

"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/withddl"

"golang.org/x/net/context"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/sqlparser"
Expand All @@ -46,10 +44,14 @@ const (
tabletUid INT UNSIGNED NOT NULL,
ts BIGINT UNSIGNED NOT NULL
) engine=InnoDB`
sqlInsertInitialRow = "INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%a, %a, %a) ON DUPLICATE KEY UPDATE ts=VALUES(ts)"
sqlUpdateHeartbeat = "UPDATE %s.heartbeat SET ts=%a, tabletUid=%a WHERE keyspaceShard=%a"
sqlUpsertHeartbeat = "INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%a, %a, %a) ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)"
)

var withDDL = withddl.New([]string{
fmt.Sprintf(sqlCreateSidecarDB, "_vt"),
fmt.Sprintf(sqlCreateHeartbeatTable, "_vt"),
})

// Writer runs on master tablets and writes heartbeats to the _vt.heartbeat
// table at a regular interval, defined by heartbeat_interval.
type Writer struct {
Expand Down Expand Up @@ -90,25 +92,9 @@ func NewWriter(env tabletenv.Env, alias topodatapb.TabletAlias) *Writer {
}
}

// Init runs at tablet startup and last minute initialization of db settings, and
// creates the necessary tables for heartbeat.
func (w *Writer) Init(target querypb.Target) error {
if !w.enabled {
return nil
}
w.mu.Lock()
defer w.mu.Unlock()
log.Info("Initializing heartbeat table.")
// InitDBConfig initializes the target name for the Writer.
func (w *Writer) InitDBConfig(target querypb.Target) {
w.keyspaceShard = fmt.Sprintf("%s:%s", target.Keyspace, target.Shard)

if target.TabletType == topodatapb.TabletType_MASTER {
err := w.initializeTables(w.env.Config().DB.AppWithDB())
if err != nil {
w.recordError(err)
return err
}
}
return nil
}

// Open sets up the Writer's db connection and launches the ticker
Expand All @@ -124,9 +110,10 @@ func (w *Writer) Open() {
if w.isOpen {
return
}

log.Info("Beginning heartbeat writes")
w.pool.Open(w.env.Config().DB.AppWithDB(), w.env.Config().DB.DbaWithDB(), w.env.Config().DB.AppDebugWithDB())
w.ticks.Start(func() { w.writeHeartbeat() })
w.ticks.Start(w.writeHeartbeat)
w.isOpen = true
}

Expand All @@ -147,36 +134,6 @@ func (w *Writer) Close() {
w.isOpen = false
}

// initializeTables attempts to create the heartbeat tables and record an
// initial row. The row is created only on master and is replicated to all
// other servers.
func (w *Writer) initializeTables(cp dbconfigs.Connector) error {
conn, err := dbconnpool.NewDBConnection(context.TODO(), cp)
if err != nil {
return vterrors.Wrap(err, "Failed to create connection for heartbeat")
}
defer conn.Close()
statements := []string{
fmt.Sprintf(sqlCreateSidecarDB, "_vt"),
fmt.Sprintf(sqlCreateHeartbeatTable, "_vt"),
}
for _, s := range statements {
if _, err := conn.ExecuteFetch(s, 0, false); err != nil {
return vterrors.Wrap(err, "Failed to execute heartbeat init query")
}
}
insert, err := w.bindHeartbeatVars(sqlInsertInitialRow)
if err != nil {
return vterrors.Wrap(err, "Failed to bindHeartbeatVars initial heartbeat insert")
}
_, err = conn.ExecuteFetch(insert, 0, false)
if err != nil {
return vterrors.Wrap(err, "Failed to execute initial heartbeat insert")
}
writes.Add(1)
return nil
}

// bindHeartbeatVars takes a heartbeat write (insert or update) and
// adds the necessary fields to the query as bind vars. This is done
// to protect ourselves against a badly formed keyspace or shard name.
Expand All @@ -196,29 +153,27 @@ func (w *Writer) bindHeartbeatVars(query string) (string, error) {

// writeHeartbeat updates the heartbeat row for this tablet with the current time in nanoseconds.
func (w *Writer) writeHeartbeat() {
defer w.env.LogError()
ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval))
defer cancel()
update, err := w.bindHeartbeatVars(sqlUpdateHeartbeat)
if err != nil {
w.recordError(err)
return
}
err = w.exec(ctx, update)
if err != nil {
if err := w.write(); err != nil {
w.recordError(err)
return
}
writes.Add(1)
}

func (w *Writer) exec(ctx context.Context, query string) error {
func (w *Writer) write() error {
defer w.env.LogError()
ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval))
defer cancel()
upsert, err := w.bindHeartbeatVars(sqlUpsertHeartbeat)
if err != nil {
return err
}
conn, err := w.pool.Get(ctx)
if err != nil {
return err
}
defer conn.Recycle()
_, err = conn.Exec(ctx, query, 0, false)
_, err = withDDL.Exec(ctx, upsert, conn.Exec)
if err != nil {
return err
}
Expand Down
54 changes: 22 additions & 32 deletions go/vt/vttablet/heartbeat/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"gotest.tools/assert"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
Expand All @@ -35,55 +38,46 @@ var (
}
)

// TestCreateSchema tests that our initial INSERT uses
// the proper arguments. It also sanity checks the other init
// queries for completeness, and verifies that we return any
// failure that is encountered.
func TestCreateSchema(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
tw := newTestWriter(db, mockNowFunc)
defer tw.Close()
writes.Reset()

db.AddQuery(fmt.Sprintf(sqlCreateHeartbeatTable, "_vt"), &sqltypes.Result{})
db.AddQuery(fmt.Sprintf("INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%d, %d, '%s') ON DUPLICATE KEY UPDATE ts=VALUES(ts)", "_vt", now.UnixNano(), tw.tabletAlias.Uid, tw.keyspaceShard), &sqltypes.Result{})
if err := tw.initializeTables(db.ConnParams()); err == nil {
t.Fatal("initializeTables() should not have succeeded")
db.OrderMatters()
upsert := fmt.Sprintf("INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%d, %d, '%s') ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)",
"_vt", now.UnixNano(), tw.tabletAlias.Uid, tw.keyspaceShard)
failInsert := fakesqldb.ExpectedExecuteFetch{
Query: upsert,
Error: mysql.NewSQLError(mysql.ERBadDb, "", "bad db error"),
}
db.AddExpectedExecuteFetch(failInsert)
db.AddExpectedQuery(fmt.Sprintf(sqlCreateSidecarDB, "_vt"), nil)
db.AddExpectedQuery(fmt.Sprintf(sqlCreateHeartbeatTable, "_vt"), nil)
db.AddExpectedQuery(upsert, nil)

db.AddQuery(fmt.Sprintf(sqlCreateSidecarDB, "_vt"), &sqltypes.Result{})
if err := tw.initializeTables(db.ConnParams()); err != nil {
t.Fatalf("Should not be in error: %v", err)
}

if got, want := writes.Get(), int64(1); got != want {
t.Fatalf("wrong writes count: got = %v, want = %v", got, want)
}
err := tw.write()
require.NoError(t, err)
}

// TestWriteHearbeat ensures the proper arguments for the UPDATE query
// and writes get recorded in counters.
func TestWriteHeartbeat(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()

tw := newTestWriter(db, mockNowFunc)
db.AddQuery(fmt.Sprintf("UPDATE %s.heartbeat SET ts=%d, tabletUid=%d WHERE keyspaceShard='%s'", "_vt", now.UnixNano(), tw.tabletAlias.Uid, tw.keyspaceShard), &sqltypes.Result{})
upsert := fmt.Sprintf("INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%d, %d, '%s') ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)",
"_vt", now.UnixNano(), tw.tabletAlias.Uid, tw.keyspaceShard)
db.AddQuery(upsert, &sqltypes.Result{})

writes.Reset()
writeErrors.Reset()

tw.writeHeartbeat()
if got, want := writes.Get(), int64(1); got != want {
t.Fatalf("wrong writes count: got = %v; want = %v", got, want)
}
if got, want := writeErrors.Get(), int64(0); got != want {
t.Fatalf("wrong write errors count: got = %v; want = %v", got, want)
}
assert.Equal(t, int64(1), writes.Get())
assert.Equal(t, int64(0), writeErrors.Get())
}

// TestWriteHeartbeatError ensures that we properly account for write errors.
func TestWriteHeartbeatError(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
Expand All @@ -94,12 +88,8 @@ func TestWriteHeartbeatError(t *testing.T) {
writeErrors.Reset()

tw.writeHeartbeat()
if got, want := writes.Get(), int64(0); got != want {
t.Fatalf("wrong writes count: got = %v; want = %v", got, want)
}
if got, want := writeErrors.Get(), int64(1); got != want {
t.Fatalf("wrong write errors count: got = %v; want = %v", got, want)
}
assert.Equal(t, int64(0), writes.Get())
assert.Equal(t, int64(1), writeErrors.Get())
}

func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *Writer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func (ec *externalConnector) Get(name string) (*mysqlConnector, error) {
c := &mysqlConnector{}
c.env = tabletenv.NewEnv(config, name)
c.se = schema.NewEngine(c.env)
c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se)
c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se, "")
c.vstreamer.InitDBConfig("")
c.se.InitDBConfig(c.env.Config().DB.DbaWithDB())

// Open
if err := c.se.Open(); err != nil {
return nil, vterrors.Wrapf(err, "external mysqlConnector: %v", name)
}
c.vstreamer.Open("", "")
c.vstreamer.Open()

// Register
ec.connectors[name] = c
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ func TestMain(m *testing.M) {

// engines cannot be initialized in testenv because it introduces
// circular dependencies.
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine)
streamerEngine.Open(env.KeyspaceName, env.Cells[0])
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, env.Cells[0])
streamerEngine.InitDBConfig(env.KeyspaceName)
streamerEngine.Open()
defer streamerEngine.Close()

if err := env.Mysqld.ExecuteSuperQuery(context.Background(), fmt.Sprintf("create database %s", vrepldb)); err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1901,9 +1901,7 @@ func TestRestartOnVStreamEnd(t *testing.T) {
expectDBClientQueries(t, []string{
"/update _vt.vreplication set message='vstream ended'",
})
if err := streamerEngine.Open(env.KeyspaceName, env.ShardName); err != nil {
t.Fatal(err)
}
streamerEngine.Open()

execStatements(t, []string{
"insert into t1 values(2, 'aaa')",
Expand Down
Loading