Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/rowstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"regexp"
"strconv"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -579,7 +580,7 @@ func TestStreamRowsHeartbeat(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

heartbeatCount := 0
var heartbeatCount int32
dataReceived := false

var options binlogdatapb.VStreamOptions
Expand All @@ -589,9 +590,9 @@ func TestStreamRowsHeartbeat(t *testing.T) {

err := engine.StreamRows(ctx, "select * from t1", nil, func(rows *binlogdatapb.VStreamRowsResponse) error {
if rows.Heartbeat {
heartbeatCount++
atomic.AddInt32(&heartbeatCount, 1)
// After receiving at least 3 heartbeats, we can be confident the fix is working
if heartbeatCount >= 3 {
if atomic.LoadInt32(&heartbeatCount) >= 3 {
cancel()
return nil
}
Expand All @@ -616,7 +617,7 @@ func TestStreamRowsHeartbeat(t *testing.T) {
// This is the critical test: we should receive multiple heartbeats
// Without the fix (missing for loop), we would only get 1 heartbeat
// With the fix, we should get at least 3 heartbeats
if heartbeatCount < 3 {
if atomic.LoadInt32(&heartbeatCount) < 3 {
t.Errorf("expected at least 3 heartbeats, got %d. This indicates the heartbeat goroutine is not running continuously", heartbeatCount)
}
}
Expand Down
Loading