From 223732cfe6729ab5543d77e61ee196087164c4e0 Mon Sep 17 00:00:00 2001 From: Yury Taranau Date: Thu, 17 Jul 2025 10:26:27 +0200 Subject: [PATCH 1/2] feat: Implement client reset for recoverable errors in Sarama calls --- ...kafkametricsreceiver-retryable-errors.yaml | 28 +++++++++++++ .../kafkametricsreceiver/broker_scraper.go | 2 +- .../broker_scraper_test.go | 2 + .../kafkametricsreceiver/consumer_scraper.go | 18 +++++++-- .../consumer_scraper_test.go | 16 ++++++-- receiver/kafkametricsreceiver/receiver.go | 40 +++++++++++++++++++ .../kafkametricsreceiver/topic_scraper.go | 14 ++++++- .../topic_scraper_test.go | 4 ++ 8 files changed, 113 insertions(+), 11 deletions(-) create mode 100644 .chloggen/kafkametricsreceiver-retryable-errors.yaml diff --git a/.chloggen/kafkametricsreceiver-retryable-errors.yaml b/.chloggen/kafkametricsreceiver-retryable-errors.yaml new file mode 100644 index 0000000000000..bb87905b78e4b --- /dev/null +++ b/.chloggen/kafkametricsreceiver-retryable-errors.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkametricsreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Implement client reset for recoverable errors in Sarama calls + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [41363] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This change implements client reset functionality to address recoverable errors in Sarama calls, such as connection resets and EOF errors. When a recoverable error is encountered, the client is reset, enabling the scraper to reconnect and resume metric collection seamlessly. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/kafkametricsreceiver/broker_scraper.go b/receiver/kafkametricsreceiver/broker_scraper.go index b0a5be6dc44cc..b93bbb2fe92a4 100644 --- a/receiver/kafkametricsreceiver/broker_scraper.go +++ b/receiver/kafkametricsreceiver/broker_scraper.go @@ -48,7 +48,7 @@ func (s *brokerScraper) shutdown(context.Context) error { func (s *brokerScraper) scrape(context.Context) (pmetric.Metrics, error) { scrapeErrors := scrapererror.ScrapeErrors{} - if s.client == nil { + if s.client == nil || s.client.Closed() { client, err := newSaramaClient(context.Background(), s.config.ClientConfig) if err != nil { return pmetric.Metrics{}, fmt.Errorf("failed to create client in brokers scraper: %w", err) diff --git a/receiver/kafkametricsreceiver/broker_scraper_test.go b/receiver/kafkametricsreceiver/broker_scraper_test.go index b19cda9454757..aa5319cfde470 100644 --- a/receiver/kafkametricsreceiver/broker_scraper_test.go +++ b/receiver/kafkametricsreceiver/broker_scraper_test.go @@ -88,6 +88,7 @@ func TestBrokerScraper_shutdown_handles_nil_client(t *testing.T) { func TestBrokerScraper_empty_resource_attribute(t *testing.T) { client := newMockClient() client.Mock.On("Brokers").Return(testBrokers) + client.Mock.On("Closed").Return(false) bs := brokerScraper{ client: client, settings: receivertest.NewNopSettings(metadata.Type), @@ -108,6 +109,7 @@ func TestBrokerScraper_empty_resource_attribute(t *testing.T) { func TestBrokerScraper_scrape(t *testing.T) { client := newMockClient() client.Mock.On("Brokers").Return(testBrokers) + client.Mock.On("Closed").Return(false) bs := brokerScraper{ client: client, settings: receivertest.NewNopSettings(metadata.Type), diff --git a/receiver/kafkametricsreceiver/consumer_scraper.go b/receiver/kafkametricsreceiver/consumer_scraper.go index bdf64c8d6a49d..6919133ccb341 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper.go +++ b/receiver/kafkametricsreceiver/consumer_scraper.go @@ -43,7 +43,7 @@ func (s *consumerScraper) shutdown(_ context.Context) error { } func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { - if s.client == nil { + if s.client == nil || s.client.Closed() { client, err := newSaramaClient(context.Background(), s.config.ClientConfig) if err != nil { return pmetric.Metrics{}, fmt.Errorf("failed to create client in consumer scraper: %w", err) @@ -64,7 +64,7 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { cgs, listErr := s.clusterAdmin.ListConsumerGroups() if listErr != nil { - return pmetric.Metrics{}, listErr + return pmetric.Metrics{}, s.resetClientOnError(listErr) } var matchedGrpIDs []string @@ -76,7 +76,7 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { allTopics, listErr := s.clusterAdmin.ListTopics() if listErr != nil { - return pmetric.Metrics{}, listErr + return pmetric.Metrics{}, s.resetClientOnError(listErr) } matchedTopics := map[string]sarama.TopicDetail{} @@ -110,7 +110,7 @@ func (s *consumerScraper) scrape(context.Context) (pmetric.Metrics, error) { } consumerGroups, listErr := s.clusterAdmin.DescribeConsumerGroups(matchedGrpIDs) if listErr != nil { - return pmetric.Metrics{}, listErr + return pmetric.Metrics{}, s.resetClientOnError(listErr) } now := pcommon.NewTimestampFromTime(time.Now()) @@ -186,3 +186,13 @@ func createConsumerScraper(_ context.Context, cfg Config, settings receiver.Sett scraper.WithShutdown(s.shutdown), ) } + +func (s *consumerScraper) resetClientOnError(err error) error { + if isRecoverableError(err) { + s.clusterAdmin.Close() + s.clusterAdmin = nil + return fmt.Errorf("closing client because of reconnection error %w", err) + } + + return err +} diff --git a/receiver/kafkametricsreceiver/consumer_scraper_test.go b/receiver/kafkametricsreceiver/consumer_scraper_test.go index 189c370860494..604b0c4af0248 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper_test.go +++ b/receiver/kafkametricsreceiver/consumer_scraper_test.go @@ -123,14 +123,16 @@ func TestConsumerScraper_createScraper_handles_invalid_group_match(t *testing.T) } func TestConsumerScraper_scrape(t *testing.T) { + client := newMockClient() filter := regexp.MustCompile(defaultGroupMatch) cs := consumerScraper{ - client: newMockClient(), + client: client, settings: receivertest.NewNopSettings(metadata.Type), clusterAdmin: newMockClusterAdmin(), topicFilter: filter, groupFilter: filter, } + client.Mock.On("Closed").Return(false) require.NoError(t, cs.start(t.Context(), componenttest.NewNopHost())) md, err := cs.scrape(t.Context()) assert.NoError(t, err) @@ -149,36 +151,41 @@ func TestConsumerScraper_scrape_handlesListTopicError(t *testing.T) { topicFilter: filter, groupFilter: filter, } + client.Mock.On("Closed").Return(false) _, err := cs.scrape(t.Context()) assert.Error(t, err) } func TestConsumerScraper_scrape_handlesListConsumerGroupError(t *testing.T) { + client := newMockClient() filter := regexp.MustCompile(defaultGroupMatch) clusterAdmin := newMockClusterAdmin() clusterAdmin.consumerGroups = nil cs := consumerScraper{ - client: newMockClient(), + client: client, settings: receivertest.NewNopSettings(metadata.Type), clusterAdmin: clusterAdmin, topicFilter: filter, groupFilter: filter, } + client.Mock.On("Closed").Return(false) _, err := cs.scrape(t.Context()) assert.Error(t, err) } func TestConsumerScraper_scrape_handlesDescribeConsumerError(t *testing.T) { + client := newMockClient() filter := regexp.MustCompile(defaultGroupMatch) clusterAdmin := newMockClusterAdmin() clusterAdmin.consumerGroupDescriptions = nil cs := consumerScraper{ - client: newMockClient(), + client: client, settings: receivertest.NewNopSettings(metadata.Type), clusterAdmin: clusterAdmin, topicFilter: filter, groupFilter: filter, } + client.Mock.On("Closed").Return(false) _, err := cs.scrape(t.Context()) assert.Error(t, err) } @@ -196,11 +203,11 @@ func TestConsumerScraper_scrape_handlesOffsetPartialError(t *testing.T) { topicFilter: filter, clusterAdmin: clusterAdmin, } + client.Mock.On("Closed").Return(false) require.NoError(t, cs.start(t.Context(), componenttest.NewNopHost())) _, err := cs.scrape(t.Context()) assert.Error(t, err) } - func TestConsumerScraper_scrape_handlesPartitionPartialError(t *testing.T) { filter := regexp.MustCompile(defaultGroupMatch) clusterAdmin := newMockClusterAdmin() @@ -214,6 +221,7 @@ func TestConsumerScraper_scrape_handlesPartitionPartialError(t *testing.T) { topicFilter: filter, clusterAdmin: clusterAdmin, } + client.Mock.On("Closed").Return(false) require.NoError(t, cs.start(t.Context(), componenttest.NewNopHost())) _, err := cs.scrape(t.Context()) assert.Error(t, err) diff --git a/receiver/kafkametricsreceiver/receiver.go b/receiver/kafkametricsreceiver/receiver.go index 4123fb204603e..45755dd5194f0 100644 --- a/receiver/kafkametricsreceiver/receiver.go +++ b/receiver/kafkametricsreceiver/receiver.go @@ -5,7 +5,12 @@ package kafkametricsreceiver // import "github.com/open-telemetry/opentelemetry- import ( "context" + "errors" "fmt" + "io" + "net" + "os" + "syscall" "github.com/IBM/sarama" "go.opentelemetry.io/collector/component" @@ -60,3 +65,38 @@ var newMetricsReceiver = func( scraperControllerOptions..., ) } + +// isRecoverableError checks if the error can be resolved by re-establishing connection +func isRecoverableError(err error) bool { + if errors.Is(err, sarama.ErrOutOfBrokers) { + return true + } + + if errors.Is(err, sarama.ErrClosedClient) { + return true + } + + if errors.Is(err, os.ErrDeadlineExceeded) { + // Error example: read tcp 10.2.3.4:62523->4.3.2.1:9093: i/o timeout + return true + } + + if errors.Is(err, syscall.EPIPE) { + return true + } + + if errors.Is(err, net.ErrClosed) { + return true + } + + if errors.Is(err, syscall.ECONNRESET) { + // Error example: write tcp 1.2.3.4:56532->4.3.2.1:9093: write: connection reset by peer + return true + } + + if errors.Is(err, io.EOF) { + return true + } + + return false +} diff --git a/receiver/kafkametricsreceiver/topic_scraper.go b/receiver/kafkametricsreceiver/topic_scraper.go index 12c7138b1d7e3..65fbd5c06c45c 100644 --- a/receiver/kafkametricsreceiver/topic_scraper.go +++ b/receiver/kafkametricsreceiver/topic_scraper.go @@ -50,7 +50,7 @@ func (s *topicScraper) start(_ context.Context, _ component.Host) error { } func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) { - if s.client == nil { + if s.client == nil || s.client.Closed() { client, err := newSaramaClient(context.Background(), s.config.ClientConfig) if err != nil { return pmetric.Metrics{}, fmt.Errorf("failed to create client in topics scraper: %w", err) @@ -61,7 +61,7 @@ func (s *topicScraper) scrape(context.Context) (pmetric.Metrics, error) { topics, err := s.client.Topics() if err != nil { s.settings.Logger.Error("Error fetching cluster topics ", zap.Error(err)) - return pmetric.Metrics{}, err + return pmetric.Metrics{}, s.resetClientOnError(err) } scrapeErrors := scrapererror.ScrapeErrors{} @@ -184,3 +184,13 @@ func createTopicsScraper(_ context.Context, cfg Config, settings receiver.Settin scraper.WithShutdown(s.shutdown), ) } + +func (s *topicScraper) resetClientOnError(err error) error { + if isRecoverableError(err) { + s.client.Close() + s.client = nil + return fmt.Errorf("closing client because of reconnection error %w", err) + } + + return err +} diff --git a/receiver/kafkametricsreceiver/topic_scraper_test.go b/receiver/kafkametricsreceiver/topic_scraper_test.go index 123563301541e..76d5eccbe9eba 100644 --- a/receiver/kafkametricsreceiver/topic_scraper_test.go +++ b/receiver/kafkametricsreceiver/topic_scraper_test.go @@ -109,6 +109,7 @@ func TestTopicScraper_scrapes(t *testing.T) { config: *config, topicFilter: match, } + client.Mock.On("Closed").Return(false) require.NoError(t, scraper.start(t.Context(), componenttest.NewNopHost())) md, err := scraper.scrape(t.Context()) assert.NoError(t, err) @@ -153,6 +154,7 @@ func TestTopicScraper_scrape_handlesTopicError(t *testing.T) { settings: receivertest.NewNopSettings(metadata.Type), topicFilter: match, } + client.Mock.On("Closed").Return(false) _, err := scraper.scrape(t.Context()) assert.Error(t, err) } @@ -168,6 +170,7 @@ func TestTopicScraper_scrape_handlesPartitionError(t *testing.T) { settings: receivertest.NewNopSettings(metadata.Type), topicFilter: match, } + client.Mock.On("Closed").Return(false) require.NoError(t, scraper.start(t.Context(), componenttest.NewNopHost())) _, err := scraper.scrape(t.Context()) assert.Error(t, err) @@ -187,6 +190,7 @@ func TestTopicScraper_scrape_handlesPartialScrapeErrors(t *testing.T) { settings: receivertest.NewNopSettings(metadata.Type), topicFilter: match, } + client.Mock.On("Closed").Return(false) require.NoError(t, scraper.start(t.Context(), componenttest.NewNopHost())) _, err := scraper.scrape(t.Context()) assert.Error(t, err) From 12bcce21f68b72093c117117e348a9facd1c11e1 Mon Sep 17 00:00:00 2001 From: Yury Taranau Date: Fri, 18 Jul 2025 14:29:28 +0200 Subject: [PATCH 2/2] added lock --- receiver/kafkametricsreceiver/consumer_scraper.go | 4 ++++ receiver/kafkametricsreceiver/consumer_scraper_test.go | 1 + receiver/kafkametricsreceiver/topic_scraper.go | 4 ++++ 3 files changed, 9 insertions(+) diff --git a/receiver/kafkametricsreceiver/consumer_scraper.go b/receiver/kafkametricsreceiver/consumer_scraper.go index 6919133ccb341..2a22987c4c524 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper.go +++ b/receiver/kafkametricsreceiver/consumer_scraper.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "regexp" + "sync" "time" "github.com/IBM/sarama" @@ -28,6 +29,7 @@ type consumerScraper struct { clusterAdmin sarama.ClusterAdmin config Config mb *metadata.MetricsBuilder + mu sync.Mutex } func (s *consumerScraper) start(_ context.Context, _ component.Host) error { @@ -189,6 +191,8 @@ func createConsumerScraper(_ context.Context, cfg Config, settings receiver.Sett func (s *consumerScraper) resetClientOnError(err error) error { if isRecoverableError(err) { + s.mu.Lock() + defer s.mu.Unlock() s.clusterAdmin.Close() s.clusterAdmin = nil return fmt.Errorf("closing client because of reconnection error %w", err) diff --git a/receiver/kafkametricsreceiver/consumer_scraper_test.go b/receiver/kafkametricsreceiver/consumer_scraper_test.go index 604b0c4af0248..6568ecff3df8c 100644 --- a/receiver/kafkametricsreceiver/consumer_scraper_test.go +++ b/receiver/kafkametricsreceiver/consumer_scraper_test.go @@ -208,6 +208,7 @@ func TestConsumerScraper_scrape_handlesOffsetPartialError(t *testing.T) { _, err := cs.scrape(t.Context()) assert.Error(t, err) } + func TestConsumerScraper_scrape_handlesPartitionPartialError(t *testing.T) { filter := regexp.MustCompile(defaultGroupMatch) clusterAdmin := newMockClusterAdmin() diff --git a/receiver/kafkametricsreceiver/topic_scraper.go b/receiver/kafkametricsreceiver/topic_scraper.go index 65fbd5c06c45c..07d495c693e38 100644 --- a/receiver/kafkametricsreceiver/topic_scraper.go +++ b/receiver/kafkametricsreceiver/topic_scraper.go @@ -8,6 +8,7 @@ import ( "fmt" "regexp" "strconv" + "sync" "time" "github.com/IBM/sarama" @@ -29,6 +30,7 @@ type topicScraper struct { topicFilter *regexp.Regexp config Config mb *metadata.MetricsBuilder + mu sync.Mutex } const ( @@ -187,6 +189,8 @@ func createTopicsScraper(_ context.Context, cfg Config, settings receiver.Settin func (s *topicScraper) resetClientOnError(err error) error { if isRecoverableError(err) { + s.mu.Lock() + defer s.mu.Unlock() s.client.Close() s.client = nil return fmt.Errorf("closing client because of reconnection error %w", err)