From 84d4494ec07d20076b933e29fc438f18e396fa4d Mon Sep 17 00:00:00 2001 From: Malcolm Akinje Date: Fri, 21 Feb 2025 23:52:00 +0530 Subject: [PATCH 1/2] Increase health check channel buffer (#17821) Signed-off-by: Malcolm Akinje --- go.mod | 1 + go.sum | 2 ++ go/vt/discovery/healthcheck.go | 9 ++++++- go/vt/discovery/healthcheck_test.go | 39 +++++++++++++++++++++++++++++ 4 files changed, 50 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 16fda71de30..7116700de59 100644 --- a/go.mod +++ b/go.mod @@ -180,6 +180,7 @@ require ( github.com/tidwall/pretty v1.2.0 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.7.0 // indirect + go.uber.org/goleak v1.3.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect diff --git a/go.sum b/go.sum index 8b10b8caeb0..70ac050edc4 100644 --- a/go.sum +++ b/go.sum @@ -801,6 +801,8 @@ go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 440737bd696..466a12f6e5b 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -58,6 +58,7 @@ import ( "vitess.io/vitess/go/vt/servenv" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/queryservice" ) @@ -68,6 +69,9 @@ var ( hcPrimaryPromotedCounters = stats.NewCountersWithMultiLabels("HealthcheckPrimaryPromoted", "Primary promoted in keyspace/shard name because of health check errors", []string{"Keyspace", "ShardName"}) healthcheckOnce sync.Once + // counter that tells us how many healthcheck messages have been dropped + hcChannelFullCounter = stats.NewCounter("HealthCheckChannelFullErrors", "Number of times the healthcheck broadcast channel was full") + // TabletURLTemplateString is a flag to generate URLs for the tablets that vtgate discovers. TabletURLTemplateString = "http://{{.GetTabletHostPort}}" tabletURLTemplate *template.Template @@ -598,7 +602,7 @@ func (hc *HealthCheckImpl) recomputeHealthy(key KeyspaceShardTabletType) { func (hc *HealthCheckImpl) Subscribe() chan *TabletHealth { hc.subMu.Lock() defer hc.subMu.Unlock() - c := make(chan *TabletHealth, 2) + c := make(chan *TabletHealth, 2048) hc.subscribers[c] = struct{}{} return c } @@ -617,6 +621,9 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { select { case c <- th: default: + // If the channel is full, we drop the message. + hcChannelFullCounter.Add(1) + log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) } } } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 28a0dcf91fe..62c4b30b531 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -24,6 +24,7 @@ import ( "io" "strings" "sync" + "sync/atomic" "testing" "time" @@ -1307,6 +1308,44 @@ func TestDebugURLFormatting(t *testing.T) { require.Contains(t, wr.String(), expectedURL, "output missing formatted URL") } +// TestConcurrentUpdates tests that concurrent updates from the HealthCheck implementation aren't dropped. +// Added in response to https://github.com/vitessio/vitess/issues/17629. +func TestConcurrentUpdates(t *testing.T) { + // reset error counters + hcErrorCounters.ResetAll() + ts := memorytopo.NewServer("cell") + defer ts.Close() + hc := createTestHc(ts) + // close healthcheck + defer hc.Close() + + // Subscribe to the healthcheck + // Make the receiver keep track of the updates received. + ch := hc.Subscribe() + var totalCount atomic.Int32 + go func() { + for range ch { + totalCount.Add(1) + // Simulate a somewhat slow consumer. + time.Sleep(100 * time.Millisecond) + } + }() + + // Run multiple updates really quickly + // one after the other. + totalUpdates := 10 + for i := 0; i < totalUpdates; i++ { + hc.broadcast(&TabletHealth{}) + } + // Unsubscribe from the healthcheck + // and verify we process all the updates eventually. + hc.Unsubscribe(ch) + defer close(ch) + require.Eventuallyf(t, func() bool { + return totalUpdates == int(totalCount.Load()) + }, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed") +} + func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservice.QueryService, error) { connMapMu.Lock() defer connMapMu.Unlock() From 49ba03c0ab1d3968d8b12beb5e3fb63a56bd7df7 Mon Sep 17 00:00:00 2001 From: Malcolm Akinje Date: Wed, 12 Mar 2025 15:48:13 -0500 Subject: [PATCH 2/2] go mod tidy Signed-off-by: Malcolm Akinje --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index 70ac050edc4..09f36f326ab 100644 --- a/go.sum +++ b/go.sum @@ -799,8 +799,6 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=