-
-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
panic: runtime error: invalid memory address or nil pointer dereference #24
Comments
looks to be a sync error with getting topic metadata and the assigning heartbeats |
Thanks for the report, and apologies for the panic. I’ll be able to look into this later in the day my time. I checked a few things quickly, and the code causing this panic comments that the caller must be non nil at that point, so that comment is clearly based on old information. My offhand guess is that something got lost / moved / added during refactorings. How reliably are you able to trigger this panic? |
At the moment every app run its happening consuming from 200+topics indvididly. Our Kafka server looks to be at 80%+ load. I was in the middle of some debugging to add some extra info: but its like the topic metadata is not full populated before the topics_and_partitions.go:73 is run. I only see one topic listed in the metadata list and not the topic we should be subscribe for heartbeats for code using client, err := kgo.NewClient(kgo.SeedBrokers(c.brokers...),
kgo.Dialer(tlsDialer.DialContext),
if err != nil {
return err
}
defer client.Close()
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit()) we create 200 clients in the app one for each topic at moment as we currently testing this library as a drop in replacement and would be optimised in that respect later. I don't see this being the problem as the library does not to contain any sheared state from what I can tell. |
Cool thanks for the update—doesn’t seem like an isolation case is a thing. I was planning to fully audit the consumer group code soon, and I’ll push up that timeline based off this issue (this weekend or next week). I do see something that I suspect and can validate later when I’m on a laptop: the code does not currently handle the case where a client is assigned a topic it actually did not express interest in. I’ll also look for a case where it is assigned something but the internal topic partitions struct has not been created for some reason. What group balancer are you using here? |
I don't override the default which this client provides |
Thanks! And lastly, are all consumers in the group using this client? |
All consumer groups with the same consumer id yes. There is only one
instance of this app running so there should be no other clients connecting
with the same id
…On Wed, 3 Mar 2021, 6:05 pm Travis Bischel, ***@***.***> wrote:
Thanks! And lastly, are all consumers in the group using this client?
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#24 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AALHS7NUG7VVY5RRRGQ6RIDTBZ26JANCNFSM4YQ7J3KQ>
.
|
Interesting, so this happening in the cooperative case. If it's easy, can you try using the StickyBalancer? And, if that causes the issue still, can you try the RangeBalancer? These can help give clues as to areas of my code to look into. |
Here's what I'm currently looking at:
I can still fix the panic itself by fixing up fetchOffsets, but given that you said all consumers are using this client, there's a bug somewhere else. Can you also confirm that each consumer is using its own dedicated client? That is, no consumer is sharing a client? I may add some info level logging around what a client expresses interest in when joining a group, and then what that client is assigned. |
We do get some connection timeouts on this cluster if that's any help, this happens a lot more frequently in other go libs, not sure about the python ones we use.
Using different balancersSticky client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit(), kgo.Balancers(kgo.StickyBalancer())) Not crashing a lot of topics are not getting any data, like they are stuck. There is a lot of lag on these topics. Only 3 topics look to have got data from our metrics. Range
Crashing
Could it be related to the number of topics we have on our kafka instance, we are only consuming from a subset Total topics is around 350~ Yes no consumer is shearing a client as we create a whole new client per topic something like this what happens internally. topics := []string{} // Would contain around 200 topics
wg := &sync.Waitroup{}
for _, topic := range topics {
topic := topic
go func() {
wg.Add(1)
defer wg.Done()
client, err := kgo.NewClient(kgo.SeedBrokers(c.brokers...),
kgo.Dialer(tlsDialer.DialContext),
if err != nil {
return err
}
defer client.Close()
client.AssignGroup("consumerID", kgo.GroupTopics(topic), kgo.DisableAutoCommit())
for {
fetches := client.PollFetches(ctx)
iter := fetches.RecordIter()
for _, fetchErr := range fetches.Errors() {
// Handle errors
}
for !iter.Done() {
msg := iter.Next()
// Do something with message
}
}
}()
}
wg.Wait() |
Thanks for testing. If it's not too much, could you try the RoundRobin balancer too? I think the dialing is unrelated, that's at a lower level than this library. |
Also, I see that you're disabling autocommitting -- can you try with autocommitting enabled? Also, if you're open to joining the Discord channel, I can ask some higher bandwidth questions, and I really appreciate your assisted debugging so far. The surface area of where this could be occurring is so far quite large, so I'm trying to narrow down at least a few things about it. |
Ok round robin no crash just stuck. The reason auto committing is disabled is because we want to only commit once we know that the msg has processed correctly. but happily disable for a quick test. Yeah should be able to join discord, but will most likely be tomorrow. |
Thanks! Here's my current plan:
When I get those fixes / logs in, if possible I'd like you to re-run the cooperative sticky test (original balancer), the sticky test, and the range test. My hope is that the balancing logs there can give some more concrete things to look at. I can have these small commits pushed today. |
auto commit same results. some topics working with messages. At this point I am just assuming its pot luck that some topics work the topics which work sometimes are different if they do work. |
No problem |
Issue #24 showed that the range balancer was broken. On first glance, the issue appears to be because the consumerIdx wraps past the end of potentialConsumers, and the obvious fix is to just inc and mod. This would be an incorrect fix. The real problem lies above in determining div and rem. The whole point is that (div*(potentialConsumers) + rem) == num partitions, thus if we drain div and one from rem every loop, then we will never increment past the end of potential consumers, because partitions will be completely drained on the last consumer. The prior logic accidentally made div smaller and rem larger, thus giving too few partitions to each consumer, and then in some scenarios, would cause us to exhaust consumers while still draining partitions. I will be adding unit tests later.
I've pushed two commits, can you try those tests with |
Ok here is the log output for client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit()) |
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit(), kgo.Balancers(kgo.StickyBalancer())) |
client.AssignGroup(c.id, kgo.GroupTopics(c.topic), kgo.DisableAutoCommit(), kgo.Balancers(kgo.RangeBalancer())) |
@owenhaynes I added the new logs at the Debug level, and I think the default is Info -- is it possible to up the level to Debug? I see some logs that are at the info level but none of the new ones I added. Also, how many members are in the group? |
I thought I had set it logging to debug will double check. Just one member as only have 1 pod running |
Here are the fixed debug logs |
FYI we upgraded to kafka 2.7 today, issue still exists so not related to using Kafka 2.6 |
As an update: I have an idea of why the topic assignment after the group balance isn't working for you, I am in the middle of fixing that. I'm not sure yet what caused the original issue you reported, though. |
This is a companion commit to 89146f6, which realistically _needs_ this commit because: - I did not properly split the work and add to the commit properly - this commit fixes some lock ordering problems I noticed while writing that commit This should fix the panic in #24 by at least logging on when it would be detected and continuing, however the bug itself is still a mystery. The debug logs about what the balance results were should help, though, if this crops up again. There are a few lock ordering fixes in here which are now documented extensively. Notably, PollFetches needs the consumer mu, and there is a huge reason as to why. The prerevoke and revoke logic, and how we ensure things are done before returning sometimes, is all more extensively documented. Lastly, all instances of assignPartitions is now properly guarded by the consumer mutex. Prior, some instances were not.
ATOMIC STORES ============= This commit switches the consumer type to be stored in an atomic value, rather than a uint8 type that specifies which pointer to use guarded by a mutex. This switch fundamentally arises from trying to unblock metadata updates while a group consumer is leaving the group. Previously, we had to grab the consumer lock to check the consumer type to check if we were consuming with regex. We avoid that now. This actually makes a bunch of other areas simpler as well -- many places needed the group consumer to do some logic on the group consumer directly. Previously, we had to grab the consumer lock, and for simplicity we held it through the function. Holding it was unnecessary, and now we avoid grabbing the lock at all. Anything that sets the consumer value grabs a new dedicated assignMu. The dedicated assignMu allows us to unblock a clean group leave, which may (in a shortly incoming commit) grab the consumer mu to assign partitions on revoke. We do not have to worry about TOCTOU: the guarantee is things work in order. If a person concurrently modifies something, they may change the outcome of stuff that was set into sequence by original events, but the outcome is still sound according to our client. Particularly, a later metadata update will trigger the right sequence for the new assignment. Same type of logic with offset setting, but people should not be doing that concurrently with assigning and whatnot. UPDATES & LOCK ORDERING FIXES ============================= This is the bulk of this commit that mostly fixes some lock orderings and missing locks. This should fix the panic in #24 by at least logging on when it would be detected and continuing, however the bug itself is still a mystery. The debug logs about what the balance results were should help, though, if this crops up again. There are a few lock ordering fixes in here which are now documented extensively. Notably, PollFetches needs the consumer mu, and there is a huge reason as to why. The prerevoke and revoke logic, and how we ensure things are done before returning sometimes, is all more extensively documented. Lastly, all instances of assignPartitions is now properly guarded by the consumer mutex. Prior, some instances were not.
For posterity, this was moved to discord and fixed in the following three commits:
These commits are released in v0.6.10 |
Client version v0.6.9
Kafka Version: kafka 2.6
Connection Auth: MTLS connection
Auto commit disabled
Our Kafka dev cluster can be under high load, so no idea if its related
The text was updated successfully, but these errors were encountered: