forked from open-telemetry/opentelemetry-collector-contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[receiver/kafkareceiver] fix: Kafka receiver blocking shutdown (open-…
…telemetry#35767) <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description Fixes an issue where the Kafka receiver would block on shutdown. There was an earlier fix for this issue [here](open-telemetry#32720). This does solve the issue, but it was only applied to the traces receiver, not the logs or metrics receiver. The issue is this go routine in the `Start()` functions for logs and metrics: ```go go func() { if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() ``` The `consumeLoop()` function returns a `context.Canceled` error when `Shutdown()` is called, which is expected. However `componentstatus.ReportStatus()` blocks while attempting to report this error. The reason/bug for this can be found [here](open-telemetry/opentelemetry-collector#9824). The previously mentioned PR fixed this for the traces receiver by checking if the error returned by `consumeLoop()` is `context.Canceled`: ```go go func() { if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) { componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err)) } }() ``` Additionally, this is `consumeLoop()` for the traces receiver, with the logs and metrics versions being identical: ```go func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error { for { // `Consume` should be called inside an infinite loop, when a // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil { c.settings.Logger.Error("Error from consumer", zap.Error(err)) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err())) return ctx.Err() } } } ``` This does fix the issue, however the only error that can be returned by `consumeLoop()` is a canceled context. When we create the context and cancel function, we use `context.Background()`: ```go ctx, cancel := context.WithCancel(context.Background()) ``` This context is only used by `consumeLoop()` and the cancel function is only called in `Shutdown()`. Because `consumeLoop()` can only return a `context.Canceled` error, this PR removes this unused code for the logs, metrics, and traces receivers. Instead, `consumeLoop()` still logs the `context.Canceled` error but it does not return any error and the go routine simply just calls `consumeLoop()`. Additional motivation for removing the call to `componentstatus.ReportStatus()` is the underlying function called by it, `componentstatus.Report()` says it does not need to be called during `Shutdown()` or `Start()` as the service already does so for the given component, [comment here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/componentstatus/status.go#L21-L25). Even if there wasn't a bug causing this call to block, the component still shouldn't call it since it would only be called during `Shutdown()`. <!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. --> #### Link to tracking issue Fixes open-telemetry#30789 <!--Describe what testing was performed and which tests were added.--> #### Testing Tested in a build of the collector with these changes scraping logs from a Kafka instance. When the collector is stopped and `Shutdown()` gets called, the receiver did not block and the collector stopped gracefully as expected.
- Loading branch information
1 parent
ebdae94
commit 0245db4
Showing
5 changed files
with
68 additions
and
32 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: bug_fix | ||
|
||
# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) | ||
component: kafkareceiver | ||
|
||
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). | ||
note: Fixes issue causing kafkareceiver to block during Shutdown(). | ||
|
||
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. | ||
issues: [30789] | ||
|
||
# (Optional) One or more lines of additional information to render under the primary note. | ||
# These lines will be padded with 2 spaces and then inserted directly into the document. | ||
# Use pipe (|) for multiline entries. | ||
subtext: | ||
|
||
# If your change doesn't affect end users or the exported elements of any package, | ||
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. | ||
# Optional: The change log or logs in which this entry should be included. | ||
# e.g. '[user]' or '[user, api]' | ||
# Include 'user' if the change is relevant to end users. | ||
# Include 'api' if there is a change to a library API. | ||
# Default: '[user]' | ||
change_logs: [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters