diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 24e3e8a6204..b6fbc09f779 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -58,6 +58,8 @@ func TestSchemaVersioning(t *testing.T) { tsv := framework.Server tsv.EnableHistorian(false) tsv.SetTracking(false) + tsv.EnableHeartbeat(false) + defer tsv.EnableHeartbeat(true) defer tsv.EnableHistorian(true) defer tsv.SetTracking(true) diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker.go b/go/vt/vttablet/tabletserver/repltracker/repltracker.go index 7c3013bc67f..99e0b7444b4 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker.go @@ -128,3 +128,9 @@ func (rt *ReplTracker) Status() (time.Duration, error) { // rt.mode == tabletenv.Poller return rt.poller.Status() } + +// EnableHeartbeat enables or disables writes of heartbeat. This functionality +// is only used by tests. +func (rt *ReplTracker) EnableHeartbeat(enable bool) { + rt.hw.enableWrites(enable) +} diff --git a/go/vt/vttablet/tabletserver/repltracker/writer.go b/go/vt/vttablet/tabletserver/repltracker/writer.go index 249fd06efa3..18712eb715d 100644 --- a/go/vt/vttablet/tabletserver/repltracker/writer.go +++ b/go/vt/vttablet/tabletserver/repltracker/writer.go @@ -111,7 +111,7 @@ func (w *heartbeatWriter) Open() { log.Info("Hearbeat Writer: opening") w.pool.Open(w.env.Config().DB.AppWithDB(), w.env.Config().DB.DbaWithDB(), w.env.Config().DB.AppDebugWithDB()) - w.ticks.Start(w.writeHeartbeat) + w.enableWrites(true) w.isOpen = true } @@ -126,7 +126,7 @@ func (w *heartbeatWriter) Close() { return } - w.ticks.Stop() + w.enableWrites(false) w.pool.Close() w.isOpen = false log.Info("Hearbeat Writer: closed") @@ -182,3 +182,12 @@ func (w *heartbeatWriter) recordError(err error) { w.errorLog.Errorf("%v", err) writeErrors.Add(1) } + +// enableWrites actives or deactives heartbeat writes +func (w *heartbeatWriter) enableWrites(enable bool) { + if enable { + w.ticks.Start(w.writeHeartbeat) + } else { + w.ticks.Stop() + } +} diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 12211acb72a..b91c6124306 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -72,7 +72,6 @@ var ( enableHotRowProtectionDryRun bool enableConsolidator bool enableConsolidatorReplicas bool - enableHeartbeat bool heartbeatInterval time.Duration healthCheckInterval time.Duration degradedThreshold time.Duration @@ -140,8 +139,9 @@ func init() { flag.BoolVar(¤tConfig.TransactionLimitByComponent, "transaction_limit_by_component", defaultConfig.TransactionLimitByComponent, "Include CallerID.component when considering who the user is for the purpose of transaction limit.") flag.BoolVar(¤tConfig.TransactionLimitBySubcomponent, "transaction_limit_by_subcomponent", defaultConfig.TransactionLimitBySubcomponent, "Include CallerID.subcomponent when considering who the user is for the purpose of transaction limit.") - flag.BoolVar(&enableHeartbeat, "heartbeat_enable", false, "If true, vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.") - flag.DurationVar(&heartbeatInterval, "heartbeat_interval", 1*time.Second, "How frequently to read and write replication heartbeat.") + var dummyEnableHeartbeat bool + flag.BoolVar(&dummyEnableHeartbeat, "heartbeat_enable", true, "Always enabled, flag to be deprecated. vttablet records (if master) or checks (if replica) the current time of a replication heartbeat in the table _vt.heartbeat. The result is used to inform the serving state of the vttablet via healthchecks.") + flag.DurationVar(&heartbeatInterval, "heartbeat_interval", time.Duration(defaultConfig.ReplicationTracker.HeartbeatIntervalSeconds*1000)*time.Millisecond, "How frequently to read and write replication heartbeat. Maximum value: 1sec") flag.BoolVar(¤tConfig.EnforceStrictTransTables, "enforce_strict_trans_tables", defaultConfig.EnforceStrictTransTables, "If true, vttablet requires MySQL to run with STRICT_TRANS_TABLES or STRICT_ALL_TABLES on. It is recommended to not turn this flag off. Otherwise MySQL may alter your supplied values before saving them to the database.") flag.BoolVar(&enableConsolidator, "enable-consolidator", true, "This option enables the query consolidator.") @@ -182,17 +182,15 @@ func Init() { currentConfig.Consolidator = Disable } - switch { - case enableHeartbeat: - currentConfig.ReplicationTracker.Mode = Heartbeat - currentConfig.ReplicationTracker.HeartbeatIntervalSeconds.Set(heartbeatInterval) - case enableReplicationReporter: - currentConfig.ReplicationTracker.Mode = Polling - currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 - default: - currentConfig.ReplicationTracker.Mode = Disable - currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 + // We hard-code enable heartbeat, and force heartbeat interval onto a reasonable value + if heartbeatInterval == 0 { + heartbeatInterval = time.Duration(defaultConfig.ReplicationTracker.HeartbeatIntervalSeconds*1000) * time.Millisecond } + if heartbeatInterval > time.Second { + heartbeatInterval = time.Second + } + currentConfig.ReplicationTracker.Mode = Heartbeat + currentConfig.ReplicationTracker.HeartbeatIntervalSeconds.Set(heartbeatInterval) currentConfig.Healthcheck.IntervalSeconds.Set(healthCheckInterval) currentConfig.Healthcheck.DegradedThresholdSeconds.Set(degradedThreshold) @@ -418,7 +416,8 @@ var defaultConfig = TabletConfig{ UnhealthyThresholdSeconds: 7200, }, ReplicationTracker: ReplicationTrackerConfig{ - Mode: Disable, + Mode: Heartbeat, + HeartbeatIntervalSeconds: 0.25, }, HotRowProtection: HotRowProtectionConfig{ Mode: Disable, diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 082c02c794f..19fe3b49c07 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -131,7 +131,8 @@ oltpReadPool: size: 16 queryCacheSize: 5000 replicationTracker: - mode: disable + heartbeatIntervalSeconds: 0.25 + mode: heartbeat schemaReloadIntervalSeconds: 1800 streamBufferSize: 32768 txPool: @@ -215,7 +216,8 @@ func TestFlags(t *testing.T) { want.Healthcheck.IntervalSeconds = 20 want.Healthcheck.DegradedThresholdSeconds = 30 want.Healthcheck.UnhealthyThresholdSeconds = 7200 - want.ReplicationTracker.Mode = Disable + want.ReplicationTracker.Mode = Heartbeat + want.ReplicationTracker.HeartbeatIntervalSeconds = 0.25 assert.Equal(t, want.DB, currentConfig.DB) assert.Equal(t, want, currentConfig) @@ -267,31 +269,29 @@ func TestFlags(t *testing.T) { want.Consolidator = Disable assert.Equal(t, want, currentConfig) - enableHeartbeat = true heartbeatInterval = 1 * time.Second currentConfig.ReplicationTracker.Mode = "" - currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 + currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0.2 Init() want.ReplicationTracker.Mode = Heartbeat want.ReplicationTracker.HeartbeatIntervalSeconds = 1 assert.Equal(t, want, currentConfig) - enableHeartbeat = false - heartbeatInterval = 1 * time.Second + heartbeatInterval = 5 * time.Second currentConfig.ReplicationTracker.Mode = "" currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 Init() - want.ReplicationTracker.Mode = Disable - want.ReplicationTracker.HeartbeatIntervalSeconds = 0 + want.ReplicationTracker.Mode = Heartbeat + want.ReplicationTracker.HeartbeatIntervalSeconds = 1 assert.Equal(t, want, currentConfig) enableReplicationReporter = true - heartbeatInterval = 1 * time.Second + heartbeatInterval = 0 * time.Second currentConfig.ReplicationTracker.Mode = "" currentConfig.ReplicationTracker.HeartbeatIntervalSeconds = 0 Init() - want.ReplicationTracker.Mode = Polling - want.ReplicationTracker.HeartbeatIntervalSeconds = 0 + want.ReplicationTracker.Mode = Heartbeat + want.ReplicationTracker.HeartbeatIntervalSeconds = 0.25 assert.Equal(t, want, currentConfig) healthCheckInterval = 1 * time.Second diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 6ef9abd7358..f34a96fe803 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1455,6 +1455,12 @@ func (tsv *TabletServer) registerTwopczHandler() { }) } +// EnableHeartbeat forces heartbeat to be on or off. +// Only to be used for testing. +func (tsv *TabletServer) EnableHeartbeat(enabled bool) { + tsv.rt.EnableHeartbeat(enabled) +} + // SetTracking forces tracking to be on or off. // Only to be used for testing. func (tsv *TabletServer) SetTracking(enabled bool) {