-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Hello spring-kafka developers. I'd like make an enhancement proposal
Problem description
An already running kafka consumer can run upon AuthenticationException (not to be mixed up with a AuthorizationException) in poll(). When this happens exception will be handled by ListenerConsumer#handleConsumerException() method which will try to delegate to ErrorHandler and when no handler was set it will just log it and proceed with polling. When no error handler is present this will result in storm of logging errors which won't be stopped until someone fixes it.
Example stacktrace:
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.common.errors.SslAuthenticationException's; no record information is available
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:151)
at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1427)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1124)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:369)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:312)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:307)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1357)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1267)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1254)
at java.base/java.security.AccessController.doPrivileged(AccessController.java:691)
at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1199)
at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:430)
at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:514)
at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:368)
at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:291)
at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:173)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:544)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1301)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1165)
at jdk.internal.reflect.GeneratedMethodAccessor136.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:208)
at jdk.proxy2/jdk.proxy2.$Proxy240.poll(Unknown Source)
at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:89)
at brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:83)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1271)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1162)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1075)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:831)
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
at java.base/sun.security.validator.Validator.validate(Validator.java:264)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:285)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)
... 35 common frames omitted
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)
... 40 common frames omitted
Proposed solution
I suggest adding a trivial fix in KafkaMessageListenerContainer.ListenerConsumer#run() to handle these exceptions just like AuthorizationExceptions:
diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
--- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java (revision 725cab9d247b93bfd346fff9a9dd7dc32a9f9b9d)
+++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java (date 1623411758510)
@@ -59,6 +59,7 @@
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;
@@ -1169,7 +1170,7 @@
exitThrowable = nofpe;
break;
}
- catch (AuthorizationException ae) {
+ catch (AuthorizationException | AuthenticationException ae) {
if (this.authorizationExceptionRetryInterval == null) {
ListenerConsumer.this.logger.error(ae, "Authorization Exception and no authorizationExceptionRetryInterval set");
this.fatalError = true;We can also add a separate catch-block to process these kind of exceptions with another strategy but this seems redundant.
Available workarounds
Now you can define an error handler to handle these exceptions. For example there are ContainerStoppingErrorHandler and ContainerStoppingBatchErrorHandler
What do you think about this?