Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka NullPointerException on fetch #535

Closed
alistairking opened this issue Aug 9, 2023 · 14 comments · Fixed by #546
Closed

Kafka NullPointerException on fetch #535

alistairking opened this issue Aug 9, 2023 · 14 comments · Fixed by #546
Labels
bug Something isn't working minor

Comments

@alistairking
Copy link
Contributor

Howdy!

Since we upgraded our Kafka cluster to 3.5.1 (from 2.7.0) we've frequently been seeing errors like this:

[2023-08-09 15:02:04,003] ERROR [KafkaApi-4] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=15, clientId=kgo, correlationId=29, headerVersion=2) -- FetchRequestData(clusterId
=null, replicaId=-1, replicaState=ReplicaState(replicaId=-1, replicaEpoch=-1), maxWaitMs=5000, minBytes=1, maxBytes=52428800, isolationLevel=0, sessionId=606347500, sessionEpoch=29, topics=[], forgottenTopicsData=[ForgottenTopic(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAA, partitions=[0])], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=15, clientId=kgo, correlationId=29, headerVersion=2), connectionId='10.99.108.16:9192-10.88.7.32:63300-535', clientAddress=/10.88.7.32, principal=User:ANONYMOUS, listenerName=ListenerName(INSIDE), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=unknown, softwareVersion=unknown), fromPrivilegedListener=true, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@7efc7797]) (kafka.server.KafkaApis)
java.lang.NullPointerException

I'm not at all sure this is franz-go's fault, but given that it's complaining about handling a request I thought that just maybe the client is doing something wrong.

@twmb
Copy link
Owner

twmb commented Aug 9, 2023

Does this come when you're closing a client?
Did this ever happen before?
What are the kgo logs around the time?

@twmb
Copy link
Owner

twmb commented Aug 9, 2023

Also what version of Kafka were you upgrading from? And what's your inter-broker-protocol set to?

@twmb
Copy link
Owner

twmb commented Aug 9, 2023

This might be another instance of #295
I solved 295 only for partitions that are added to fetch requests,
but a partition that is "forgotten" and removed, I think there's a gap in my logic.

@alistairking
Copy link
Contributor Author

We upgraded from 2.7.0, inter-broker is set to 3.5

No clients were being closed at the time, but there's a decent chance that new topics/partitions were created recently (not deleted however).

As for kgo logs, there's a bunch of things like

immediate metadata update triggered
fetch had inner topic errors: FENCED_LEADER_EPOCH{....

and

resetting fetch session
INVALID_FETCH_SESSION_EPOCH: The fetch session epoch is invalid.

@twmb
Copy link
Owner

twmb commented Aug 9, 2023

Definitely not 295 -- the logic I was worried about is actually complete.

Can you enable debug logs?
I've also noticed Kafka is a bit worse with metadata in 3.5 so I wonder what the metadata responses are -- and some surrounding logs.

@alistairking
Copy link
Contributor Author

Yep, I should be able to do that.

@alistairking
Copy link
Contributor Author

alistairking commented Aug 9, 2023

I'm still working on getting the relevant debug logs, but this just happened again, and our entire kafka cluster crashed.
-- I think it's possible/likely that we had one broker crash and then somehow we ended up with a cascading failure.

Here's a sample of the errors we got from kafka in case they're interesting:

[2023-08-09 18:40:01,468] ERROR [ReplicaFetcher replicaId=4, leaderId=6, fetcherId=8] Error for partition xxx-0>
org.apache.kafka.common.errors.KafkaStorageException: Disk error when trying to access log file on the disk.
java.io.IOException: Connection to 6 was disconnected before the response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:99)
        at kafka.server.BrokerBlockingSender.sendRequest(BrokerBlockingSender.scala:113)
        at kafka.server.RemoteLeaderEndPoint.fetch(RemoteLeaderEndPoint.scala:79)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:316)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:437)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:112)
        at kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:98)
        at org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:127)

and

[2023-08-09 18:48:27,710] ERROR Closing socket for 10.3.108.16:9192-10.2.22.42:48218-9 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.InvalidRequestException: Error getting request for apiKey: PRODUCE, apiVersion: 9, connectionId: 10.3.108.16:9192-10.2.22.42:48218-9, listenerName: ListenerName(INSIDE), principal:User:ANONYMOUS
Caused by: java.lang.IllegalArgumentException: Varint is too long, the most significant bit in the 5th byte is set, converted value: b645ea89
        at org.apache.kafka.common.utils.ByteUtils.illegalVarintException(ByteUtils.java:453)
        at org.apache.kafka.common.utils.ByteUtils.readUnsignedVarint(ByteUtils.java:160)
        at org.apache.kafka.common.protocol.ByteBufferAccessor.readUnsignedVarint(ByteBufferAccessor.java:70)
        at org.apache.kafka.common.message.ProduceRequestData$TopicProduceData.read(ProduceRequestData.java:545)
        at org.apache.kafka.common.message.ProduceRequestData$TopicProduceData.<init>(ProduceRequestData.java:464)
        at org.apache.kafka.common.message.ProduceRequestData.read(ProduceRequestData.java:172)
        at org.apache.kafka.common.message.ProduceRequestData.<init>(ProduceRequestData.java:114)
        at org.apache.kafka.common.requests.ProduceRequest.parse(ProduceRequest.java:256)
        at org.apache.kafka.common.requests.AbstractRequest.doParseRequest(AbstractRequest.java:171)
        at org.apache.kafka.common.requests.AbstractRequest.parseRequest(AbstractRequest.java:165)
        at org.apache.kafka.common.requests.RequestContext.parseRequest(RequestContext.java:95)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:102)
        at kafka.network.Processor.$anonfun$processCompletedReceives$1(SocketServer.scala:1148)
        at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:647)
        at kafka.network.Processor.processCompletedReceives(SocketServer.scala:1126)
        at kafka.network.Processor.run(SocketServer.scala:1012)
        at java.base/java.lang.Thread.run(Thread.java:833)

and

java.lang.NullPointerException: Cannot invoke "String.hashCode()" because the return value of "kafka.server.CachedPartition.topic()" is null

@twmb
Copy link
Owner

twmb commented Aug 9, 2023

The entire cluster crashing is definitely a Kafka problem...
To know which line that is requires generating the serializers/deserializers in the Kafka source.

@alistairking
Copy link
Contributor Author

Of course I haven't been able to reproduce this after I added the debug logging. It's possible it happens on partition creation, so I'll try that next week.

@alistairking
Copy link
Contributor Author

Ok, it looks like this error pops up just after we signal a broker to shut down (e.g., during a rolling restart). Now that I know how to reproduce I should be able to get debug logs later this week.

@twmb
Copy link
Owner

twmb commented Aug 25, 2023

Any luck reproducing?

@twmb twmb added the waiting label Aug 25, 2023
@alistairking
Copy link
Contributor Author

Yes and no. I've managed to reproduce it a few times now (indeed it pops up when we restart a broker), but I lost the build I had running that had debug logs enabled.
Sorry for the delay, I've been fighting a bunch of fires this week. I'll try and get the debug build back out there next week and repro.

@twmb twmb added the bug Something isn't working label Aug 26, 2023
@twmb
Copy link
Owner

twmb commented Aug 27, 2023

I am fairly certain I see the bug, wip to fix.

twmb added a commit that referenced this issue Aug 27, 2023
When using a fetch session, if we stop fetching a topic or partition, we
send that information in the fetch request.

If we forget an entire topic, that means we do not add any cursor for
the topic internally -- we just outright are no longer fetching the
topic -- we previously had no topic ID in the fetch request for the
forgotten topic. When sending this forgotten topic in the fetch request,
we would not have the ID for it, and this would cause a NPE in Kafka.

Now, when we add a topic to the session, we also save the topic ID.
We use this for two purposes:
* Now we correctly send the forgotten topic ID
* We also can pin fetch requests to non-topic-ID versions if any
  topic is missing an ID at any point in the session (i.e. if a
  forgotten topic has no ID)

Lastly we add a guard in metadata updating to ignore updates that miss
topic IDs if we previously had a topic ID.

Closes #535.
@twmb
Copy link
Owner

twmb commented Aug 27, 2023

If you're able, can you try this branch?
https://github.com/twmb/franz-go/compare/535

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working minor
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants