diff --git a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go index 935bad6d3c5..01b09cd4090 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go @@ -21,6 +21,7 @@ import ( "fmt" "regexp" "strconv" + "sync/atomic" "testing" "time" @@ -573,7 +574,7 @@ func TestStreamRowsHeartbeat(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - heartbeatCount := 0 + var heartbeatCount int32 dataReceived := false var options binlogdatapb.VStreamOptions @@ -583,9 +584,9 @@ func TestStreamRowsHeartbeat(t *testing.T) { err := engine.StreamRows(ctx, "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error { if rows.Heartbeat { - heartbeatCount++ + atomic.AddInt32(&heartbeatCount, 1) // After receiving at least 3 heartbeats, we can be confident the fix is working - if heartbeatCount >= 3 { + if atomic.LoadInt32(&heartbeatCount) >= 3 { cancel() return nil } @@ -610,7 +611,7 @@ func TestStreamRowsHeartbeat(t *testing.T) { // This is the critical test: we should receive multiple heartbeats // Without the fix (missing for loop), we would only get 1 heartbeat // With the fix, we should get at least 3 heartbeats - if heartbeatCount < 3 { + if atomic.LoadInt32(&heartbeatCount) < 3 { t.Errorf("expected at least 3 heartbeats, got %d. This indicates the heartbeat goroutine is not running continuously", heartbeatCount) } }