Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ type DB struct {
// if fakesqldb is asked to serve queries or query patterns that it has not been explicitly told about it will
// error out by default. However if you set this flag then any unmatched query results in an empty result
neverFail atomic.Bool

// lastError stores the last error in returning a query result.
lastErrorMu sync.Mutex
lastError error
}

// QueryHandler is the interface used by the DB to simulate executed queries
Expand Down Expand Up @@ -176,6 +180,7 @@ func New(t testing.TB) *DB {
connections: make(map[uint32]*mysql.Conn),
queryPatternUserCallback: make(map[*regexp.Regexp]func(string)),
patternData: make(map[string]exprResult),
lastErrorMu: sync.Mutex{},
}

db.Handler = db
Expand Down Expand Up @@ -245,6 +250,13 @@ func (db *DB) CloseAllConnections() {
}
}

// LastError gives the last error the DB ran into
func (db *DB) LastError() error {
db.lastErrorMu.Lock()
defer db.lastErrorMu.Unlock()
return db.lastError
}

// WaitForClose should be used after CloseAllConnections() is closed and
// you want to provoke a MySQL client error with errno 2006.
//
Expand Down Expand Up @@ -342,7 +354,14 @@ func (db *DB) WarningCount(c *mysql.Conn) uint16 {
}

// HandleQuery is the default implementation of the QueryHandler interface
func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) (err error) {
defer func() {
if err != nil {
db.lastErrorMu.Lock()
db.lastError = err
db.lastErrorMu.Unlock()
}
}()
if db.allowAll.Load() {
return callback(&sqltypes.Result{})
}
Expand Down Expand Up @@ -413,7 +432,7 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
return callback(&sqltypes.Result{})
}
// Nothing matched.
err := fmt.Errorf("fakesqldb:: query: '%s' is not supported on %v",
err = fmt.Errorf("fakesqldb:: query: '%s' is not supported on %v",
sqlparser.TruncateForUI(query), db.name)
log.Errorf("Query not found: %s", sqlparser.TruncateForUI(query))

Expand Down
26 changes: 22 additions & 4 deletions go/vt/vttablet/tabletserver/health_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/vt/servenv"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/servenv"

"vitess.io/vitess/go/vt/sqlparser"

Expand Down Expand Up @@ -81,6 +80,8 @@ type healthStreamer struct {
cancel context.CancelFunc
clients map[chan *querypb.StreamHealthResponse]struct{}
state *querypb.StreamHealthResponse
// isServingPrimary stores if this tablet is currently the serving primary or not.
isServingPrimary bool

history *history.History

Expand Down Expand Up @@ -310,6 +311,21 @@ func (hs *healthStreamer) AppendDetails(details []*kv) []*kv {
return details
}

// MakePrimary tells the healthstreamer that the current tablet is now the primary,
// so it can read and write to the MySQL instance for schema-tracking.
func (hs *healthStreamer) MakePrimary(serving bool) {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.isServingPrimary = serving
}

// MakeNonPrimary tells the healthstreamer that the current tablet is now not a primary.
func (hs *healthStreamer) MakeNonPrimary() {
hs.mu.Lock()
defer hs.mu.Unlock()
hs.isServingPrimary = false
}

func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) {
hs.unhealthyThreshold.Set(v)
shr := proto.Clone(hs.state).(*querypb.StreamHealthResponse)
Expand All @@ -328,8 +344,10 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) {
func (hs *healthStreamer) reload() error {
hs.mu.Lock()
defer hs.mu.Unlock()
// Schema Reload to happen only on primary.
if hs.state.Target.TabletType != topodatapb.TabletType_PRIMARY {
// Schema Reload to happen only on primary when it is serving.
// We can be in a state when the primary is not serving after we have run DemotePrimary. In that case,
// we don't want to run any queries in MySQL, so we shouldn't reload anything in the healthStreamer.
if !hs.isServingPrimary {
return nil
}

Expand Down
38 changes: 36 additions & 2 deletions go/vt/vttablet/tabletserver/health_streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/sync2"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand Down Expand Up @@ -60,6 +60,36 @@ func newConfig(db *fakesqldb.DB) *tabletenv.TabletConfig {
return cfg
}

// TestNotServingPrimaryNoWrite makes sure that the health-streamer doesn't write anything to the database when
// the state is not serving primary.
func TestNotServingPrimaryNoWrite(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
config := newConfig(db)
config.SignalWhenSchemaChange = true

env := tabletenv.NewEnv(config, "TestNotServingPrimary")
alias := &topodatapb.TabletAlias{
Cell: "cell",
Uid: 1,
}
// Create a new health streamer and set it to a serving primary state
hs := newHealthStreamer(env, alias)
hs.isServingPrimary = true
hs.InitDBConfig(&querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}, config.DB.DbaWithDB())
hs.Open()
defer hs.Close()

// Let's say the tablet goes to a non-serving primary state.
hs.MakePrimary(false)

// A reload now should not write anything to the database. If any write happens it will error out since we have not
// added any query to the database to expect.
err := hs.reload()
require.NoError(t, err)
require.NoError(t, db.LastError())
}

func TestHealthStreamerBroadcast(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
Expand Down Expand Up @@ -171,6 +201,7 @@ func TestReloadSchema(t *testing.T) {
}
blpFunc = testBlpFunc
hs := newHealthStreamer(env, alias)
hs.MakePrimary(true)

target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}
configs := config.DB
Expand Down Expand Up @@ -231,6 +262,7 @@ func TestDoesNotReloadSchema(t *testing.T) {
}
blpFunc = testBlpFunc
hs := newHealthStreamer(env, alias)
hs.MakePrimary(true)

target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}
configs := config.DB
Expand Down Expand Up @@ -283,6 +315,7 @@ func TestInitialReloadSchema(t *testing.T) {
}
blpFunc = testBlpFunc
hs := newHealthStreamer(env, alias)
hs.MakePrimary(true)

target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}
configs := config.DB
Expand Down Expand Up @@ -341,6 +374,7 @@ func TestReloadView(t *testing.T) {
env := tabletenv.NewEnv(config, "TestReloadView")
alias := &topodatapb.TabletAlias{Cell: "cell", Uid: 1}
hs := newHealthStreamer(env, alias)
hs.MakePrimary(true)

target := &querypb.Target{TabletType: topodatapb.TabletType_PRIMARY}
configs := config.DB
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ func (sm *stateManager) servePrimary() error {
return err
}

sm.hs.MakePrimary(true)
sm.rt.MakePrimary()
sm.tracker.Open()
// We instantly kill all stateful queries to allow for
Expand All @@ -467,6 +468,7 @@ func (sm *stateManager) unservePrimary() error {
return err
}

sm.hs.MakePrimary(false)
sm.rt.MakePrimary()
sm.setState(topodatapb.TabletType_PRIMARY, StateNotServing)
return nil
Expand All @@ -483,6 +485,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er
sm.messager.Close()
sm.tracker.Close()
sm.se.MakeNonPrimary()
sm.hs.MakeNonPrimary()

if err := sm.connect(wantTabletType); err != nil {
return err
Expand All @@ -500,6 +503,7 @@ func (sm *stateManager) unserveNonPrimary(wantTabletType topodatapb.TabletType)
sm.unserveCommon()

sm.se.MakeNonPrimary()
sm.hs.MakeNonPrimary()

if err := sm.connect(wantTabletType); err != nil {
return err
Expand Down