From 7c59e0b7c34bcc075e748b605fce42466d705521 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 19 Feb 2025 15:03:21 +0530 Subject: [PATCH 1/4] feat: increase healthcheck buffer size and add a counter for the number of messages dropped Signed-off-by: Manan Gupta --- go/vt/discovery/healthcheck.go | 6 +- go/vt/discovery/healthcheck_test.go | 103 ++++++++++++++++++++++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 2f270bd7518..f4d75d2bb07 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -69,6 +69,8 @@ var ( hcPrimaryPromotedCounters = stats.NewCountersWithMultiLabels("HealthcheckPrimaryPromoted", "Primary promoted in keyspace/shard name because of health check errors", []string{"Keyspace", "ShardName"}) healthcheckOnce sync.Once + hcMessagesDropped = stats.NewCounter("HealthCheckMessagesDropped", "Number of messages dropped by the healthcheck because the subscriber buffer was full") + // TabletURLTemplateString is a flag to generate URLs for the tablets that vtgate discovers. TabletURLTemplateString = "http://{{.GetTabletHostPort}}" tabletURLTemplate *template.Template @@ -632,7 +634,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) { func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth { hc.subMu.Lock() defer hc.subMu.Unlock() - c := make(chan *TabletHealth, 2) + c := make(chan *TabletHealth, 2048) hc.subscribers[c] = struct{}{} return c } @@ -651,6 +653,8 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { select { case c <- th: default: + log.Errorf("HealthCheckImpl.broadcast: subscriber buffer full, dropping message") + hcMessagesDropped.Add(1) } } } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 6a03e1589dd..e46919c7c87 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -1475,6 +1475,109 @@ func TestDebugURLFormatting(t *testing.T) { require.Contains(t, wr.String(), expectedURL, "output missing formatted URL") } +// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped. +// Added in response to https://github.com/vitessio/vitess/issues/17629. +func TestConcurrentUpdates(t *testing.T) { + ctx := utils.LeakCheckContext(t) + // reset error counters + hcErrorCounters.ResetAll() + ts := memorytopo.NewServer(ctx, "cell") + defer ts.Close() + hc := createTestHc(ctx, ts) + // close healthcheck + defer hc.Close() + + // Subscribe to the healthcheck + // Make the receiver keep track of the updates received. + ch := hc.Subscribe() + totalCount := 0 + go func() { + for range ch { + totalCount++ + // Simulate a somewhat slow consumer. + time.Sleep(100 * time.Millisecond) + } + }() + + // Run multiple updates really quickly + // one after the other. + totalUpdates := 10 + for i := 0; i < totalUpdates; i++ { + hc.broadcast(&TabletHealth{}) + } + // Unsubscribe from the healthcheck + // and verify we process all the updates eventually. + hc.Unsubscribe(ch) + defer close(ch) + require.Eventuallyf(t, func() bool { + return totalUpdates == totalCount + }, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed") +} + +// BenchmarkAccess_FastConsumer benchmarks the access time of the healthcheck for a fast consumer. +func BenchmarkAccess_FastConsumer(b *testing.B) { + ctx := context.Background() + // reset error counters + hcErrorCounters.ResetAll() + ts := memorytopo.NewServer(ctx, "cell") + defer ts.Close() + hc := createTestHc(ctx, ts) + // close healthcheck + defer hc.Close() + + for i := 0; i < b.N; i++ { + // Subscribe to the healthcheck with a fast consumer. + ch := hc.Subscribe() + go func() { + for range ch { + } + }() + + for id := 0; id < 1000; id++ { + hc.broadcast(&TabletHealth{}) + } + hc.Unsubscribe(ch) + waitForEmptyMessageQueue(ch) + } +} + +// BenchmarkAccess_SlowConsumer benchmarks the access time of the healthcheck for a slow consumer. +func BenchmarkAccess_SlowConsumer(b *testing.B) { + ctx := context.Background() + // reset error counters + hcErrorCounters.ResetAll() + ts := memorytopo.NewServer(ctx, "cell") + defer ts.Close() + hc := createTestHc(ctx, ts) + // close healthcheck + defer hc.Close() + + for i := 0; i < b.N; i++ { + // Subscribe to the healthcheck with a slow consumer. + ch := hc.Subscribe() + go func() { + for range ch { + time.Sleep(50 * time.Millisecond) + } + }() + + for id := 0; id < 100; id++ { + hc.broadcast(&TabletHealth{}) + } + hc.Unsubscribe(ch) + waitForEmptyMessageQueue(ch) + } +} + +func waitForEmptyMessageQueue(queue chan *TabletHealth) { + for { + if len(queue) == 0 { + return + } + time.Sleep(100 * time.Millisecond) + } +} + func tabletDialer(ctx context.Context, tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) { connMapMu.Lock() defer connMapMu.Unlock() From f5fdb7ceefdc4124361b7a4cb0554dded6e855ac Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 19 Feb 2025 19:21:11 +0530 Subject: [PATCH 2/4] feat: fix data race in the test Signed-off-by: Manan Gupta --- go/vt/discovery/healthcheck_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index e46919c7c87..83df1718884 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -23,6 +23,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "testing" "time" @@ -1490,10 +1491,10 @@ func TestConcurrentUpdates(t *testing.T) { // Subscribe to the healthcheck // Make the receiver keep track of the updates received. ch := hc.Subscribe() - totalCount := 0 + var totalCount atomic.Int32 go func() { for range ch { - totalCount++ + totalCount.Add(1) // Simulate a somewhat slow consumer. time.Sleep(100 * time.Millisecond) } @@ -1510,7 +1511,7 @@ func TestConcurrentUpdates(t *testing.T) { hc.Unsubscribe(ch) defer close(ch) require.Eventuallyf(t, func() bool { - return totalUpdates == totalCount + return totalUpdates == int(totalCount.Load()) }, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed") } From 4ccfe5dde8803b40db6c079db68a25ac019efa7c Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 21 Feb 2025 14:41:38 +0530 Subject: [PATCH 3/4] feat: address review comments Signed-off-by: Manan Gupta --- go/vt/discovery/healthcheck.go | 8 +++-- go/vt/discovery/healthcheck_test.go | 55 ----------------------------- 2 files changed, 5 insertions(+), 58 deletions(-) diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index f4d75d2bb07..5734749b167 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -69,7 +69,8 @@ var ( hcPrimaryPromotedCounters = stats.NewCountersWithMultiLabels("HealthcheckPrimaryPromoted", "Primary promoted in keyspace/shard name because of health check errors", []string{"Keyspace", "ShardName"}) healthcheckOnce sync.Once - hcMessagesDropped = stats.NewCounter("HealthCheckMessagesDropped", "Number of messages dropped by the healthcheck because the subscriber buffer was full") + // counter that tells us how many healthcheck messages have been dropped + hcChannelFullCounter = stats.NewCounter("HealthCheckChannelFullErrors", "Number of times the healthcheck broadcast channel was full") // TabletURLTemplateString is a flag to generate URLs for the tablets that vtgate discovers. TabletURLTemplateString = "http://{{.GetTabletHostPort}}" @@ -653,8 +654,9 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { select { case c <- th: default: - log.Errorf("HealthCheckImpl.broadcast: subscriber buffer full, dropping message") - hcMessagesDropped.Add(1) + // If the channel is full, we drop the message. + hcChannelFullCounter.Add(1) + log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) } } } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 83df1718884..b78450d4843 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -1515,61 +1515,6 @@ func TestConcurrentUpdates(t *testing.T) { }, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed") } -// BenchmarkAccess_FastConsumer benchmarks the access time of the healthcheck for a fast consumer. -func BenchmarkAccess_FastConsumer(b *testing.B) { - ctx := context.Background() - // reset error counters - hcErrorCounters.ResetAll() - ts := memorytopo.NewServer(ctx, "cell") - defer ts.Close() - hc := createTestHc(ctx, ts) - // close healthcheck - defer hc.Close() - - for i := 0; i < b.N; i++ { - // Subscribe to the healthcheck with a fast consumer. - ch := hc.Subscribe() - go func() { - for range ch { - } - }() - - for id := 0; id < 1000; id++ { - hc.broadcast(&TabletHealth{}) - } - hc.Unsubscribe(ch) - waitForEmptyMessageQueue(ch) - } -} - -// BenchmarkAccess_SlowConsumer benchmarks the access time of the healthcheck for a slow consumer. -func BenchmarkAccess_SlowConsumer(b *testing.B) { - ctx := context.Background() - // reset error counters - hcErrorCounters.ResetAll() - ts := memorytopo.NewServer(ctx, "cell") - defer ts.Close() - hc := createTestHc(ctx, ts) - // close healthcheck - defer hc.Close() - - for i := 0; i < b.N; i++ { - // Subscribe to the healthcheck with a slow consumer. - ch := hc.Subscribe() - go func() { - for range ch { - time.Sleep(50 * time.Millisecond) - } - }() - - for id := 0; id < 100; id++ { - hc.broadcast(&TabletHealth{}) - } - hc.Unsubscribe(ch) - waitForEmptyMessageQueue(ch) - } -} - func waitForEmptyMessageQueue(queue chan *TabletHealth) { for { if len(queue) == 0 { From 2b3e5659b8574f9f7d72c0fab22e8caa2044a307 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 21 Feb 2025 14:43:13 +0530 Subject: [PATCH 4/4] feat: remove unused function Signed-off-by: Manan Gupta --- go/vt/discovery/healthcheck_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index b78450d4843..81f7d8b80b1 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -1515,15 +1515,6 @@ func TestConcurrentUpdates(t *testing.T) { }, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed") } -func waitForEmptyMessageQueue(queue chan *TabletHealth) { - for { - if len(queue) == 0 { - return - } - time.Sleep(100 * time.Millisecond) - } -} - func tabletDialer(ctx context.Context, tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) { connMapMu.Lock() defer connMapMu.Unlock()