Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ var (
hcPrimaryPromotedCounters = stats.NewCountersWithMultiLabels("HealthcheckPrimaryPromoted", "Primary promoted in keyspace/shard name because of health check errors", []string{"Keyspace", "ShardName"})
healthcheckOnce sync.Once

// counter that tells us how many healthcheck messages have been dropped
hcChannelFullCounter = stats.NewCounter("HealthCheckChannelFullErrors", "Number of times the healthcheck broadcast channel was full")

// TabletURLTemplateString is a flag to generate URLs for the tablets that vtgate discovers.
TabletURLTemplateString = "http://{{.GetTabletHostPort}}"
tabletURLTemplate *template.Template
Expand Down Expand Up @@ -632,7 +635,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) {
func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth {
hc.subMu.Lock()
defer hc.subMu.Unlock()
c := make(chan *TabletHealth, 2)
c := make(chan *TabletHealth, 2048)
hc.subscribers[c] = struct{}{}
return c
}
Expand All @@ -651,6 +654,9 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
select {
case c <- th:
default:
// If the channel is full, we drop the message.
hcChannelFullCounter.Add(1)
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
}
}
}
Expand Down
40 changes: 40 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"io"
"strings"
"sync"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1475,6 +1476,45 @@ func TestDebugURLFormatting(t *testing.T) {
require.Contains(t, wr.String(), expectedURL, "output missing formatted URL")
}

// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped.
// Added in response to https://github.com/vitessio/vitess/issues/17629.
func TestConcurrentUpdates(t *testing.T) {
ctx := utils.LeakCheckContext(t)
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
// close healthcheck
defer hc.Close()

// Subscribe to the healthcheck
// Make the receiver keep track of the updates received.
ch := hc.Subscribe()
var totalCount atomic.Int32
go func() {
for range ch {
totalCount.Add(1)
// Simulate a somewhat slow consumer.
time.Sleep(100 * time.Millisecond)
}
}()

// Run multiple updates really quickly
// one after the other.
totalUpdates := 10
for i := 0; i < totalUpdates; i++ {
hc.broadcast(&TabletHealth{})
}
// Unsubscribe from the healthcheck
// and verify we process all the updates eventually.
hc.Unsubscribe(ch)
defer close(ch)
require.Eventuallyf(t, func() bool {
return totalUpdates == int(totalCount.Load())
}, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth adding another test like this?

// TestBroadcastChannelFull tests that some concurrent broadcasts from the healthcheck to its subscribers may be dropped.
func TestBroadcastChannelFull(t *testing.T) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add one, but it doesn't seem that useful to me

func tabletDialer(ctx context.Context, tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) {
connMapMu.Lock()
defer connMapMu.Unlock()
Expand Down
Loading