diff --git a/go/mysql/flavor.go b/go/mysql/flavor.go index 2902308d843..7204a36a916 100644 --- a/go/mysql/flavor.go +++ b/go/mysql/flavor.go @@ -315,8 +315,15 @@ func parseReplicationStatus(fields map[string]string) ReplicationStatus { status.SourcePort = int(parseInt) parseInt, _ = strconv.ParseInt(fields["Connect_Retry"], 10, 0) status.ConnectRetry = int(parseInt) - parseUint, _ := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 0) - status.ReplicationLagSeconds = uint(parseUint) + parseUint, err := strconv.ParseUint(fields["Seconds_Behind_Master"], 10, 0) + if err != nil { + // we could not parse the value into a valid uint -- most commonly because the value is NULL from the + // database -- so let's reflect that the underlying value was unknown on our last check + status.ReplicationLagUnknown = true + } else { + status.ReplicationLagUnknown = false + status.ReplicationLagSeconds = uint(parseUint) + } parseUint, _ = strconv.ParseUint(fields["Master_Server_Id"], 10, 0) status.SourceServerID = uint(parseUint) diff --git a/go/mysql/replication_status.go b/go/mysql/replication_status.go index 801d38a680f..2c818dcb537 100644 --- a/go/mysql/replication_status.go +++ b/go/mysql/replication_status.go @@ -30,6 +30,9 @@ type ReplicationStatus struct { // were to finish executing everything that's currently in its relay log. // However, some MySQL flavors don't expose this information, // in which case RelayLogPosition.IsZero() will be true. + // If ReplicationLagUnknown is true then we should not rely on the seconds + // behind value and we can instead try to calculate the lag ourselves when + // appropriate. RelayLogPosition Position FilePosition Position FileRelayLogPosition Position @@ -37,6 +40,7 @@ type ReplicationStatus struct { IOThreadRunning bool SQLThreadRunning bool ReplicationLagSeconds uint + ReplicationLagUnknown bool SourceHost string SourcePort int ConnectRetry int diff --git a/go/test/endtoend/tabletmanager/main_test.go b/go/test/endtoend/tabletmanager/main_test.go index 08770665c49..9c118465edc 100644 --- a/go/test/endtoend/tabletmanager/main_test.go +++ b/go/test/endtoend/tabletmanager/main_test.go @@ -35,21 +35,23 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - tmClient *tmc.Client - primaryTabletParams mysql.ConnParams - replicaTabletParams mysql.ConnParams - primaryTablet cluster.Vttablet - replicaTablet cluster.Vttablet - rdonlyTablet cluster.Vttablet - hostname = "localhost" - keyspaceName = "ks" - shardName = "0" - keyspaceShard = "ks/" + shardName - dbName = "vt_" + keyspaceName - username = "vt_dba" - cell = "zone1" - sqlSchema = ` + clusterInstance *cluster.LocalProcessCluster + tmClient *tmc.Client + primaryTabletParams mysql.ConnParams + replicaTabletParams mysql.ConnParams + primaryTablet cluster.Vttablet + replicaTablet cluster.Vttablet + rdonlyTablet cluster.Vttablet + hostname = "localhost" + keyspaceName = "ks" + shardName = "0" + keyspaceShard = "ks/" + shardName + dbName = "vt_" + keyspaceName + username = "vt_dba" + cell = "zone1" + tabletHealthcheckRefreshInterval = 5 * time.Second + tabletUnhealthyThreshold = tabletHealthcheckRefreshInterval * 2 + sqlSchema = ` create table t1( id bigint, value varchar(16), @@ -93,12 +95,17 @@ func TestMain(m *testing.M) { } // List of users authorized to execute vschema ddl operations - clusterInstance.VtGateExtraArgs = []string{"-vschema_ddl_authorized_users=%"} + clusterInstance.VtGateExtraArgs = []string{ + "-vschema_ddl_authorized_users=%", + "-discovery_low_replication_lag", tabletUnhealthyThreshold.String(), + } // Set extra tablet args for lock timeout clusterInstance.VtTabletExtraArgs = []string{ "-lock_tables_timeout", "5s", "-watch_replication_stream", "-enable_replication_reporter", + "-health_check_interval", tabletHealthcheckRefreshInterval.String(), + "-unhealthy_threshold", tabletUnhealthyThreshold.String(), } // We do not need semiSync for this test case. clusterInstance.EnableSemiSync = false diff --git a/go/test/endtoend/tabletmanager/tablet_health_test.go b/go/test/endtoend/tabletmanager/tablet_health_test.go index 81946a1edea..a27ab5a3d47 100644 --- a/go/test/endtoend/tabletmanager/tablet_health_test.go +++ b/go/test/endtoend/tabletmanager/tablet_health_test.go @@ -24,6 +24,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/stretchr/testify/require" @@ -134,7 +135,7 @@ func TestHealthCheck(t *testing.T) { // make sure the health stream is updated result, err := clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias) require.NoError(t, err) - verifyStreamHealth(t, result) + verifyStreamHealth(t, result, true) // then restart replication, make sure we stay healthy err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rTablet.Alias) @@ -148,7 +149,48 @@ func TestHealthCheck(t *testing.T) { require.NoError(t, err) scanner := bufio.NewScanner(strings.NewReader(result)) for scanner.Scan() { - verifyStreamHealth(t, scanner.Text()) + verifyStreamHealth(t, scanner.Text(), true) + } + + // stop the replica's source mysqld instance to break replication + // and test that the replica tablet becomes unhealthy and non-serving after crossing + // the tablet's -unhealthy_threshold and the gateway's -discovery_low_replication_lag + err = primaryTablet.MysqlctlProcess.Stop() + require.NoError(t, err) + + time.Sleep(tabletUnhealthyThreshold + tabletHealthcheckRefreshInterval) + + // now the replica's VtTabletStreamHealth should show it as unhealthy + result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias) + require.NoError(t, err) + scanner = bufio.NewScanner(strings.NewReader(result)) + for scanner.Scan() { + verifyStreamHealth(t, scanner.Text(), false) + } + + // start the primary tablet's mysqld back up + primaryTablet.MysqlctlProcess.InitMysql = false + err = primaryTablet.MysqlctlProcess.Start() + primaryTablet.MysqlctlProcess.InitMysql = true + require.NoError(t, err) + + // explicitly start replication on all of the replicas to avoid any test flakiness as they were all + // replicating from the primary instance + err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rTablet.Alias) + require.NoError(t, err) + err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", replicaTablet.Alias) + require.NoError(t, err) + err = clusterInstance.VtctlclientProcess.ExecuteCommand("StartReplication", rdonlyTablet.Alias) + require.NoError(t, err) + + time.Sleep(tabletHealthcheckRefreshInterval) + + // now the replica's VtTabletStreamHealth should show it as healthy again + result, err = clusterInstance.VtctlclientProcess.ExecuteCommandWithOutput("VtTabletStreamHealth", "-count", "1", rTablet.Alias) + require.NoError(t, err) + scanner = bufio.NewScanner(strings.NewReader(result)) + for scanner.Scan() { + verifyStreamHealth(t, scanner.Text(), true) } // Manual cleanup of processes @@ -183,7 +225,7 @@ func checkTabletType(t *testing.T, tabletAlias string, typeWant string) { assert.Equal(t, want, got) } -func verifyStreamHealth(t *testing.T, result string) { +func verifyStreamHealth(t *testing.T, result string, expectHealthy bool) { var streamHealthResponse querypb.StreamHealthResponse err := json2.Unmarshal([]byte(result), &streamHealthResponse) require.NoError(t, err) @@ -191,10 +233,14 @@ func verifyStreamHealth(t *testing.T, result string) { UID := streamHealthResponse.GetTabletAlias().GetUid() realTimeStats := streamHealthResponse.GetRealtimeStats() replicationLagSeconds := realTimeStats.GetReplicationLagSeconds() - assert.True(t, serving, "Tablet should be in serving state") assert.True(t, UID > 0, "Tablet should contain uid") - // replicationLagSeconds varies till 7200 so setting safe limit - assert.True(t, replicationLagSeconds < 10000, "replica should not be behind primary") + if expectHealthy { + assert.True(t, serving, "Tablet should be in serving state") + // replicationLagSeconds varies till 7200 so setting safe limit + assert.True(t, replicationLagSeconds < 10000, "replica should not be behind primary") + } else { + assert.True(t, (!serving || replicationLagSeconds >= uint32(tabletUnhealthyThreshold.Seconds())), "Tablet should not be in serving and healthy state") + } } func TestHealthCheckDrainedStateDoesNotShutdownQueryService(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/repltracker/poller.go b/go/vt/vttablet/tabletserver/repltracker/poller.go index 995223dc28d..5a9112be82b 100644 --- a/go/vt/vttablet/tabletserver/repltracker/poller.go +++ b/go/vt/vttablet/tabletserver/repltracker/poller.go @@ -50,7 +50,12 @@ func (p *poller) Status() (time.Duration, error) { return 0, err } - if !status.ReplicationRunning() { + // If replication is not currently running or we don't know what the lag is -- most commonly + // because the replica mysqld is in the process of trying to start replicating from its source + // but it hasn't yet reached the point where it can calculate the seconds_behind_master + // value and it's thus NULL -- then we will estimate the lag ourselves using the last seen + // value + the time elapsed since. + if !status.ReplicationRunning() || status.ReplicationLagUnknown { if p.timeRecorded.IsZero() { return 0, vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "replication is not running") }