Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ var (
hcPrimaryPromotedCounters = stats.NewCountersWithMultiLabels("HealthcheckPrimaryPromoted", "Primary promoted in keyspace/shard name because of health check errors", []string{"Keyspace", "ShardName"})
healthcheckOnce sync.Once

hcMessagesDropped = stats.NewCounter("HealthCheckMessagesDropped", "Number of messages dropped by the healthcheck because the subscriber buffer was full")
Comment thread
GuptaManan100 marked this conversation as resolved.
Outdated

// TabletURLTemplateString is a flag to generate URLs for the tablets that vtgate discovers.
TabletURLTemplateString = "http://{{.GetTabletHostPort}}"
tabletURLTemplate *template.Template
Expand Down Expand Up @@ -632,7 +634,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
hc.subMu.Lock()
defer hc.subMu.Unlock()
c := make(chan *TabletHealth, 2)
c := make(chan *TabletHealth, 2048)
hc.subscribers[c] = struct{}{}
return c
}
Expand All @@ -651,6 +653,8 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
select {
case c <- th:
default:
log.Errorf("HealthCheckImpl.broadcast: subscriber buffer full, dropping message")
hcMessagesDropped.Add(1)
Comment thread
GuptaManan100 marked this conversation as resolved.
Outdated
}
}
}
Expand Down
104 changes: 104 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1475,6 +1476,109 @@ func TestDebugURLFormatting(t *testing.T) {
require.Contains(t, wr.String(), expectedURL, "output missing formatted URL")
}

// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped.
// Added in response to https://github.com/vitessio/vitess/issues/17629.
func TestConcurrentUpdates(t *testing.T) {
ctx := utils.LeakCheckContext(t)
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
// close healthcheck
defer hc.Close()

// Subscribe to the healthcheck
// Make the receiver keep track of the updates received.
ch := hc.Subscribe()
var totalCount atomic.Int32
go func() {
for range ch {
totalCount.Add(1)
// Simulate a somewhat slow consumer.
time.Sleep(100 * time.Millisecond)
}
}()

// Run multiple updates really quickly
// one after the other.
totalUpdates := 10
for i := 0; i < totalUpdates; i++ {
hc.broadcast(&TabletHealth{})
}
// Unsubscribe from the healthcheck
// and verify we process all the updates eventually.
hc.Unsubscribe(ch)
defer close(ch)
require.Eventuallyf(t, func() bool {
return totalUpdates == int(totalCount.Load())
}, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding another test like this?

// TestBroadcastChannelFull tests that some concurrent broadcasts from the healthcheck to its subscribers may be dropped.
func TestBroadcastChannelFull(t *testing.T) {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add one, but it doesn't seem that useful to me

// BenchmarkAccess_FastConsumer benchmarks the access time of the healthcheck for a fast consumer.
func BenchmarkAccess_FastConsumer(b *testing.B) {
Comment thread
GuptaManan100 marked this conversation as resolved.
Outdated
ctx := context.Background()
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
// close healthcheck
defer hc.Close()

for i := 0; i < b.N; i++ {
// Subscribe to the healthcheck with a fast consumer.
ch := hc.Subscribe()
go func() {
for range ch {
}
}()

for id := 0; id < 1000; id++ {
hc.broadcast(&TabletHealth{})
}
hc.Unsubscribe(ch)
waitForEmptyMessageQueue(ch)
}
}

// BenchmarkAccess_SlowConsumer benchmarks the access time of the healthcheck for a slow consumer.
func BenchmarkAccess_SlowConsumer(b *testing.B) {
ctx := context.Background()
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
// close healthcheck
defer hc.Close()

for i := 0; i < b.N; i++ {
// Subscribe to the healthcheck with a slow consumer.
ch := hc.Subscribe()
go func() {
for range ch {
time.Sleep(50 * time.Millisecond)
}
}()

for id := 0; id < 100; id++ {
hc.broadcast(&TabletHealth{})
}
hc.Unsubscribe(ch)
waitForEmptyMessageQueue(ch)
}
}

func waitForEmptyMessageQueue(queue chan *TabletHealth) {
for {
if len(queue) == 0 {
return
}
time.Sleep(100 * time.Millisecond)
}
}

func tabletDialer(ctx context.Context, tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) {
connMapMu.Lock()
defer connMapMu.Unlock()
Expand Down