Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka receiver stuck while shutting down at v0.93.0 #30789

Closed
james-ryans opened this issue Jan 26, 2024 · 19 comments · Fixed by #32720 or #35767
Closed

Kafka receiver stuck while shutting down at v0.93.0 #30789

james-ryans opened this issue Jan 26, 2024 · 19 comments · Fixed by #32720 or #35767
Labels
bug Something isn't working priority:p1 High receiver/kafka

Comments

@james-ryans
Copy link
Contributor

james-ryans commented Jan 26, 2024

Component(s)

receiver/kafka

What happened?

Description

Shutting down kafka receiver got stuck forever while transitioning from StatusStopping to StatusStopped.

I've debugged this for a while and apparently this is because of consumeLoop returns context canceled error and do ReportStatus FatalErrorEvent at receiver/kafkareceiver/kafka_receiver.go between line 163-165. But the sync.Mutex.Lock() in ReportStatus FatalErrorEvent never gets unlocked (I don't know why), so that the ReportStatus for StatusStopped stuck forever while trying to acquire the mutex lock.

Also, I've tried to rollback to v0.92.0 and it works well. And traced to issue down to receiver/kafkareceiver/kafka_receiver.go at line 164 c.settings.ReportStatus(component.NewFatalErrorEvent(err)) changed at PR #30593 was the cause.

Steps to Reproduce

Create a collector with kafkareceiver factory in it. And have a receivers.kafka in the config.

Expected Result

Should be able to shutdown properly.

Actual Result

Stuck indefinitely while shutting down with the logs below.

Collector version

v0.93.0

Environment information

No response

OpenTelemetry Collector configuration

service:
  pipelines:
    traces:
      receivers: [kafka]

receivers:
  kafka:
    brokers:
      - localhost:9092
    encoding: otlp_proto # available encodings are otlp_proto, jaeger_proto, jaeger_json, zipkin_proto, zipkin_json, zipkin_thrift
    initial_offset: earliest # consume messages from the beginning

Log output

2024-01-26T08:32:45.266+0700	info	[email protected]/kafka_receiver.go:431	Starting consumer group	{"kind": "receiver", "name": "kafka", "data_type": "traces", "partition": 0}
^C2024-01-26T08:32:53.626+0700	info	[email protected]/collector.go:258	Received signal from OS	{"signal": "interrupt"}
2024-01-26T08:32:53.626+0700	info	[email protected]/service.go:179	Starting shutdown...
2024-01-26T08:32:54.010+0700	info	[email protected]/kafka_receiver.go:181	Consumer stopped	{"kind": "receiver", "name": "kafka", "data_type": "traces", "error": "context canceled"}

Additional context

No response

@james-ryans james-ryans added bug Something isn't working needs triage New item requiring triage labels Jan 26, 2024
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@crobert-1
Copy link
Member

@mwear: Do you have any thoughts on why a component may never be able to get the lock when reporting status? It looks like this may be related to the work you've been doing on component status reporting.

Possible related PR: open-telemetry/opentelemetry-collector#8836

@mwear
Copy link
Member

mwear commented Jan 26, 2024

Based on the research @james-ryans did, this came in after this change: #30610. What I suspect is happening is that writing the fatal error to the asyncErrorChannel in serviceHost is blocking, so that ReportStatus never returns (and never releases its lock). Here is the suspect line: https://github.com/open-telemetry/opentelemetry-collector/blob/main/service/host.go#L73.

I think this is a variation of this existing problem: open-telemetry/opentelemetry-collector#8116, which is also assigned to me. It has been on my todo list. I'll look into it.

@crobert-1
Copy link
Member

Thanks @mwear, appreciate your insight here!

Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions github-actions bot added the Stale label Mar 27, 2024
@crobert-1 crobert-1 removed the Stale label Mar 27, 2024
@atoulme
Copy link
Contributor

atoulme commented Mar 30, 2024

This is open-telemetry/opentelemetry-collector#9824.

@atoulme atoulme removed the needs triage New item requiring triage label Mar 30, 2024
@lahsivjar
Copy link
Member

This is open-telemetry/opentelemetry-collector#9824.

If I am not mistaken, this issue should happen for all receivers. Here is an example of a flaky test in the collector-contrib due to the same issue happening for the opencensus receiver: https://github.com/open-telemetry/opentelemetry-collector-contrib/actions/runs/8742859512/job/23992117763. The test has goroutine dump which indicates to the same problem with asyncErrorChannel as is pointed out in the shared issue.

@crobert-1
Copy link
Member

This is open-telemetry/opentelemetry-collector#9824.

Here is an example of a flaky test in the collector-contrib due to the same issue happening for the opencensus receiver: https://github.com/open-telemetry/opentelemetry-collector-contrib/actions/runs/8742859512/job/23992117763. The test has goroutine dump which indicates to the same problem with asyncErrorChannel as is pointed out in the shared issue.

Adding a reference to the issue for the flaky test: #27295

@crobert-1
Copy link
Member

+1 freq: #32667

@Dennis8274
Copy link

quick fix as follows?
image

@crobert-1
Copy link
Member

Your fixed worked @Dennis8274, thanks for the suggestion! I've posted a PR to resolve this issue. 👍

MovieStoreGuy pushed a commit that referenced this issue May 22, 2024
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
The kafka receiver's shutdown method is to cancel the context of a
running sub goroutine. However, a small bug was causing a fatal error to
be reported during shutdown when this expected condition was hit. The
fatal error being reported during shutdown was causing another bug to be
hit,
open-telemetry/opentelemetry-collector#9824.

This fix means that shutdown won't be blocked in expected shutdown
conditions, but the `core` bug referenced above means shutdown will
still be block in unexpected error situations.

This fix is being taken from a comment made by @Dennis8274 on the issue.

**Link to tracking Issue:** <Issue number if applicable>
Fixes #30789

**Testing:** <Describe what testing was performed and which tests were
added.>
Stepped through `TestTracesReceiverStart` in a debugger before the
change to see the fatal status being reported. It was no longer reported
after applying the fix. Manually tested running the collector with a
kafka receiver and saw that before the fix it was indeed being blocked
on a normal shutdown, but after the fix it shutdown as expected.
@tejas-contentstack
Copy link

I'm facing a similar issue while shutting down the Kafka receiver. If we try to shut down the collector, it stops the consumer however, starts the consumer right after. Check in the logs snapshot

Collector version: v0.101.0

Here's the Otel config file:

receivers:

  kafka/metrics:
    brokers:  ["0.0.0.0:9092", "localhost:9092", "localhost:9092"]
    group_id: otel-metrics-consumer
    topic: topic-otel-metrics
    header_extraction:
      extract_headers: true
      headers:
        - uid
    metadata:
      full: true
      
exporters:
  exporter:


service:
  telemetry:
    logs:
      output_paths: ["stdout"]
      error_output_paths: ["stderr"]
    metrics:
      level: none
  extensions: []
  pipelines: 
    metrics:
      receivers: [kafka/metrics]
      processors: []
      exporters: [exporter]
Screenshot 2024-08-30 at 1 11 42 PM

@crobert-1
Copy link
Member

Hello @tejas-contentstack, thanks for adding frequency! In this case it may be best to open a new issue and reference this one with it, since the error message is slightly different. I agree it looks like it may be a similar problem, but it would be best to investigate fully to make sure we're not missing anything 👍

@tbm48813
Copy link

Hello, I am seeing the same behavior in version 0.109.0. The issue is that the receiver never shuts down, it only hangs while trying. Relevant log entry is:
{"level":"info","ts":"2024-09-26T11:27:48.191-0400","caller":"[email protected]/kafka_receiver.go:388","msg":"Consumer stopped","kind":"receiver","name":"kafka/kafkastream__logs","data_type":"logs","error":"context canceled"}
The service needs to be manually stopped each time as it will hang here.

@djaglowski
Copy link
Member

It looks like this might be a very slightly different problem but also may be an easy fix (See #35438). I'm just going to reopen this issue and resolve it again with the new PR if that works.

@djaglowski djaglowski reopened this Sep 26, 2024
@djaglowski
Copy link
Member

My quick fix was wishful thinking. Still, I given that we have two reports of the receiver still not shutting down correctly after #32720, I think we might as well leave the issue open until we have a robust solution.

@jsirianni
Copy link
Member

I was seeing this as well. The receiver is working great until it is time to shutdown.

@dpaasman00
Copy link
Contributor

Working on a fix for this!

@dpaasman00
Copy link
Contributor

Above PR should resolve this issue!

djaglowski pushed a commit that referenced this issue Oct 22, 2024
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Fixes an issue where the Kafka receiver would block on shutdown.

There was an earlier fix for this issue
[here](#32720).
This does solve the issue, but it was only applied to the traces
receiver, not the logs or metrics receiver.

The issue is this go routine in the `Start()` functions for logs and
metrics:
```go
go func() {
        if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil {
		componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
	}
}()
```

The `consumeLoop()` function returns a `context.Canceled` error when
`Shutdown()` is called, which is expected. However
`componentstatus.ReportStatus()` blocks while attempting to report this
error. The reason/bug for this can be found
[here](open-telemetry/opentelemetry-collector#9824).

The previously mentioned PR fixed this for the traces receiver by
checking if the error returned by `consumeLoop()` is `context.Canceled`:
```go
go func() {
	if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) {
		componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
	}
}()
```

Additionally, this is `consumeLoop()` for the traces receiver, with the
logs and metrics versions being identical:
```go
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
	for {
		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
			c.settings.Logger.Error("Error from consumer", zap.Error(err))
		}
		// check if context was cancelled, signaling that the consumer should stop
		if ctx.Err() != nil {
			c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
			return ctx.Err()
		}
	}
}
```

This does fix the issue, however the only error that can be returned by
`consumeLoop()` is a canceled context. When we create the context and
cancel function, we use `context.Background()`:
```go
ctx, cancel := context.WithCancel(context.Background())
```
This context is only used by `consumeLoop()` and the cancel function is
only called in `Shutdown()`.

Because `consumeLoop()` can only return a `context.Canceled` error, this
PR removes this unused code for the logs, metrics, and traces receivers.
Instead, `consumeLoop()` still logs the `context.Canceled` error but it
does not return any error and the go routine simply just calls
`consumeLoop()`.

Additional motivation for removing the call to
`componentstatus.ReportStatus()` is the underlying function called by
it, `componentstatus.Report()` says it does not need to be called during
`Shutdown()` or `Start()` as the service already does so for the given
component, [comment
here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/componentstatus/status.go#L21-L25).
Even if there wasn't a bug causing this call to block, the component
still shouldn't call it since it would only be called during
`Shutdown()`.

<!-- Issue number (e.g. #1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes #30789

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Tested in a build of the collector with these changes scraping logs from
a Kafka instance. When the collector is stopped and `Shutdown()` gets
called, the receiver did not block and the collector stopped gracefully
as expected.
sbylica-splunk pushed a commit to sbylica-splunk/opentelemetry-collector-contrib that referenced this issue Dec 17, 2024
…telemetry#35767)

<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
#### Description
Fixes an issue where the Kafka receiver would block on shutdown.

There was an earlier fix for this issue
[here](open-telemetry#32720).
This does solve the issue, but it was only applied to the traces
receiver, not the logs or metrics receiver.

The issue is this go routine in the `Start()` functions for logs and
metrics:
```go
go func() {
        if err := c.consumeLoop(ctx, metricsConsumerGroup); err != nil {
		componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
	}
}()
```

The `consumeLoop()` function returns a `context.Canceled` error when
`Shutdown()` is called, which is expected. However
`componentstatus.ReportStatus()` blocks while attempting to report this
error. The reason/bug for this can be found
[here](open-telemetry/opentelemetry-collector#9824).

The previously mentioned PR fixed this for the traces receiver by
checking if the error returned by `consumeLoop()` is `context.Canceled`:
```go
go func() {
	if err := c.consumeLoop(ctx, consumerGroup); !errors.Is(err, context.Canceled) {
		componentstatus.ReportStatus(host, componentstatus.NewFatalErrorEvent(err))
	}
}()
```

Additionally, this is `consumeLoop()` for the traces receiver, with the
logs and metrics versions being identical:
```go
func (c *kafkaTracesConsumer) consumeLoop(ctx context.Context, handler sarama.ConsumerGroupHandler) error {
	for {
		// `Consume` should be called inside an infinite loop, when a
		// server-side rebalance happens, the consumer session will need to be
		// recreated to get the new claims
		if err := c.consumerGroup.Consume(ctx, c.topics, handler); err != nil {
			c.settings.Logger.Error("Error from consumer", zap.Error(err))
		}
		// check if context was cancelled, signaling that the consumer should stop
		if ctx.Err() != nil {
			c.settings.Logger.Info("Consumer stopped", zap.Error(ctx.Err()))
			return ctx.Err()
		}
	}
}
```

This does fix the issue, however the only error that can be returned by
`consumeLoop()` is a canceled context. When we create the context and
cancel function, we use `context.Background()`:
```go
ctx, cancel := context.WithCancel(context.Background())
```
This context is only used by `consumeLoop()` and the cancel function is
only called in `Shutdown()`.

Because `consumeLoop()` can only return a `context.Canceled` error, this
PR removes this unused code for the logs, metrics, and traces receivers.
Instead, `consumeLoop()` still logs the `context.Canceled` error but it
does not return any error and the go routine simply just calls
`consumeLoop()`.

Additional motivation for removing the call to
`componentstatus.ReportStatus()` is the underlying function called by
it, `componentstatus.Report()` says it does not need to be called during
`Shutdown()` or `Start()` as the service already does so for the given
component, [comment
here](https://github.com/open-telemetry/opentelemetry-collector/blob/main/component/componentstatus/status.go#L21-L25).
Even if there wasn't a bug causing this call to block, the component
still shouldn't call it since it would only be called during
`Shutdown()`.

<!-- Issue number (e.g. open-telemetry#1234) or full URL to issue, if applicable. -->
#### Link to tracking issue
Fixes open-telemetry#30789

<!--Describe what testing was performed and which tests were added.-->
#### Testing
Tested in a build of the collector with these changes scraping logs from
a Kafka instance. When the collector is stopped and `Shutdown()` gets
called, the receiver did not block and the collector stopped gracefully
as expected.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment