Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ func TestSchemaVersioning(t *testing.T) {
tsv := framework.Server
tsv.EnableHistorian(false)
tsv.SetTracking(false)
tsv.EnableHeartbeat(false)
defer tsv.EnableHeartbeat(true)
defer tsv.EnableHistorian(true)
defer tsv.SetTracking(true)

Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,9 @@ func (rt *ReplTracker) Status() (time.Duration, error) {
// rt.mode == tabletenv.Poller
return rt.poller.Status()
}

// EnableHeartbeat enables or disables writes of heartbeat. This functionality
// is only used by tests.
func (rt *ReplTracker) EnableHeartbeat(enable bool) {
rt.hw.enableWrites(enable)
}
13 changes: 11 additions & 2 deletions go/vt/vttablet/tabletserver/repltracker/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (w *heartbeatWriter) Open() {
log.Info("Hearbeat Writer: opening")

w.pool.Open(w.env.Config().DB.AppWithDB(), w.env.Config().DB.DbaWithDB(), w.env.Config().DB.AppDebugWithDB())
w.ticks.Start(w.writeHeartbeat)
w.enableWrites(true)
w.isOpen = true
}

Expand All @@ -126,7 +126,7 @@ func (w *heartbeatWriter) Close() {
return
}

w.ticks.Stop()
w.enableWrites(false)
w.pool.Close()
w.isOpen = false
log.Info("Hearbeat Writer: closed")
Expand Down Expand Up @@ -182,3 +182,12 @@ func (w *heartbeatWriter) recordError(err error) {
w.errorLog.Errorf("%v", err)
writeErrors.Add(1)
}

// enableWrites actives or deactives heartbeat writes
func (w *heartbeatWriter) enableWrites(enable bool) {
if enable {
w.ticks.Start(w.writeHeartbeat)
} else {
w.ticks.Stop()
}
}
27 changes: 13 additions & 14 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ var (
enableHotRowProtectionDryRun bool
enableConsolidator bool
enableConsolidatorReplicas bool
enableHeartbeat bool
heartbeatInterval time.Duration
healthCheckInterval time.Duration
degradedThreshold time.Duration
Expand Down Expand Up @@ -140,8 +139,9 @@ func init() {
flag.BoolVar(&currentConfig.TransactionLimitByComponent, "transaction_limit_by_component", defaultConfig.TransactionLimitByComponent, "Include CallerID.component when considering who the user is for the purpose of transaction limit.")
flag.BoolVar(&currentConfig.TransactionLimitBySubcomponent, "transaction_limit_by_subcomponent", defaultConfig.TransactionLimitBySubcomponent, "Include CallerID.subcomponent when considering who the user is for the purpose of transaction limit.")

flag.BoolVar(&enableHeartbeat, "heartbeat_enable", false, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.")
flag.DurationVar(&heartbeatInterval, "heartbeat_interval", 1*time.Second, "How frequently to read and write replication heartbeat.")
var dummyEnableHeartbeat bool
flag.BoolVar(&dummyEnableHeartbeat, "heartbeat_enable", true, "Always enabled, flag to be deprecated. vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.")
flag.DurationVar(&heartbeatInterval, "heartbeat_interval", time.Duration(defaultConfig.ReplicationTracker.HeartbeatIntervalSeconds*1000)*time.Millisecond, "How frequently to read and write replication heartbeat. Maximum value: 1sec")

flag.BoolVar(&currentConfig.EnforceStrictTransTables, "enforce_strict_trans_tables", defaultConfig.EnforceStrictTransTables, "If true, vttablet requires MySQL to run with STRICT_TRANS_TABLES or STRICT_ALL_TABLES on. It is recommended to not turn this flag off. Otherwise MySQL may alter your supplied values before saving them to the database.")
flag.BoolVar(&enableConsolidator, "enable-consolidator", true, "This option enables the query consolidator.")
Expand Down Expand Up @@ -182,17 +182,15 @@ func Init() {
currentConfig.Consolidator = Disable
}

switch {
case enableHeartbeat:
currentConfig.ReplicationTracker.Mode = Heartbeat
currentConfig.ReplicationTracker.HeartbeatIntervalSeconds.Set(heartbeatInterval)
case enableReplicationReporter:
currentConfig.ReplicationTracker.Mode = Polling
currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0
default:
currentConfig.ReplicationTracker.Mode = Disable
currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0
// We hard-code enable heartbeat, and force heartbeat interval onto a reasonable value
if heartbeatInterval == 0 {
heartbeatInterval = time.Duration(defaultConfig.ReplicationTracker.HeartbeatIntervalSeconds*1000) * time.Millisecond
}
if heartbeatInterval > time.Second {
heartbeatInterval = time.Second
}
currentConfig.ReplicationTracker.Mode = Heartbeat
currentConfig.ReplicationTracker.HeartbeatIntervalSeconds.Set(heartbeatInterval)

currentConfig.Healthcheck.IntervalSeconds.Set(healthCheckInterval)
currentConfig.Healthcheck.DegradedThresholdSeconds.Set(degradedThreshold)
Expand Down Expand Up @@ -418,7 +416,8 @@ var defaultConfig = TabletConfig{
UnhealthyThresholdSeconds: 7200,
},
ReplicationTracker: ReplicationTrackerConfig{
Mode: Disable,
Mode: Heartbeat,
HeartbeatIntervalSeconds: 0.25,
},
HotRowProtection: HotRowProtectionConfig{
Mode: Disable,
Expand Down
22 changes: 11 additions & 11 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ oltpReadPool:
size: 16
queryCacheSize: 5000
replicationTracker:
mode: disable
heartbeatIntervalSeconds: 0.25
mode: heartbeat
schemaReloadIntervalSeconds: 1800
streamBufferSize: 32768
txPool:
Expand Down Expand Up @@ -215,7 +216,8 @@ func TestFlags(t *testing.T) {
want.Healthcheck.IntervalSeconds = 20
want.Healthcheck.DegradedThresholdSeconds = 30
want.Healthcheck.UnhealthyThresholdSeconds = 7200
want.ReplicationTracker.Mode = Disable
want.ReplicationTracker.Mode = Heartbeat
want.ReplicationTracker.HeartbeatIntervalSeconds = 0.25
assert.Equal(t, want.DB, currentConfig.DB)
assert.Equal(t, want, currentConfig)

Expand Down Expand Up @@ -267,31 +269,29 @@ func TestFlags(t *testing.T) {
want.Consolidator = Disable
assert.Equal(t, want, currentConfig)

enableHeartbeat = true
heartbeatInterval = 1 * time.Second
currentConfig.ReplicationTracker.Mode = ""
currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0
currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0.2
Init()
want.ReplicationTracker.Mode = Heartbeat
want.ReplicationTracker.HeartbeatIntervalSeconds = 1
assert.Equal(t, want, currentConfig)

enableHeartbeat = false
heartbeatInterval = 1 * time.Second
heartbeatInterval = 5 * time.Second
currentConfig.ReplicationTracker.Mode = ""
currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0
Init()
want.ReplicationTracker.Mode = Disable
want.ReplicationTracker.HeartbeatIntervalSeconds = 0
want.ReplicationTracker.Mode = Heartbeat
want.ReplicationTracker.HeartbeatIntervalSeconds = 1
assert.Equal(t, want, currentConfig)

enableReplicationReporter = true
heartbeatInterval = 1 * time.Second
heartbeatInterval = 0 * time.Second
currentConfig.ReplicationTracker.Mode = ""
currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0
Init()
want.ReplicationTracker.Mode = Polling
want.ReplicationTracker.HeartbeatIntervalSeconds = 0
want.ReplicationTracker.Mode = Heartbeat
want.ReplicationTracker.HeartbeatIntervalSeconds = 0.25
assert.Equal(t, want, currentConfig)

healthCheckInterval = 1 * time.Second
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1455,6 +1455,12 @@ func (tsv *TabletServer) registerTwopczHandler() {
})
}

// EnableHeartbeat forces heartbeat to be on or off.
// Only to be used for testing.
func (tsv *TabletServer) EnableHeartbeat(enabled bool) {
tsv.rt.EnableHeartbeat(enabled)
}

// SetTracking forces tracking to be on or off.
// Only to be used for testing.
func (tsv *TabletServer) SetTracking(enabled bool) {
Expand Down