Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,23 @@
import org.springframework.core.MethodIntrospector;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.kafka.config.KafkaListenerConfigUtils;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.HeaderMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.HeadersMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageMethodArgumentResolver;
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
Expand Down Expand Up @@ -612,10 +620,29 @@ private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
defaultFactory.setBeanFactory(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory);
ConfigurableBeanFactory cbf =
(KafkaListenerAnnotationBeanPostProcessor.this.beanFactory instanceof ConfigurableBeanFactory ?
(ConfigurableBeanFactory) KafkaListenerAnnotationBeanPostProcessor.this.beanFactory : null);
DefaultFormattingConversionService conversionService = new DefaultFormattingConversionService();
defaultFactory.setConversionService(conversionService);

List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
// Annotation-based argument resolution
argumentResolvers.add(new HeaderMethodArgumentResolver(conversionService, cbf));
argumentResolvers.add(new HeadersMethodArgumentResolver());

// Type-based argument resolution
argumentResolvers.add(new MessageMethodArgumentResolver());
argumentResolvers.add(new PayloadArgumentResolver(new GenericMessageConverter(conversionService)) {
@Override
protected boolean isEmptyPayload(Object payload) {
return payload == null || payload instanceof KafkaNull;
}
});
defaultFactory.setArgumentResolvers(argumentResolvers);

defaultFactory.afterPropertiesSet();
return defaultFactory;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2016 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.support;

/**
* This class represents NULL Kafka payload.
*
* @author Dariusz Szablinski
*/
public final class KafkaNull {
/**
* Instance of KafkaNull.
*/
public final static KafkaNull INSTANCE = new KafkaNull();

private KafkaNull() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
Expand Down Expand Up @@ -105,7 +106,7 @@ protected Object convertPayload(Message<?> message) {
* @return the value.
*/
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
return record.value();
return record.value() == null ? KafkaNull.INSTANCE : record.value();
}

@SuppressWarnings("serial")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.kafka.support.KafkaNull;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -66,8 +67,12 @@ protected Object convertPayload(Message<?> message) {

@Override
protected Object extractAndConvertValue(ConsumerRecord<?, ?> record, Type type) {
JavaType javaType = TypeFactory.defaultInstance().constructType(type);
Object value = record.value();
if (record.value() == null) {
return KafkaNull.INSTANCE;
}

JavaType javaType = TypeFactory.defaultInstance().constructType(type);
if (value instanceof String) {
try {
return this.objectMapper.readValue((String) value, javaType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class EnableKafkaIntegrationTests {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, "annotated1", "annotated2", "annotated3",
"annotated4", "annotated5", "annotated6", "annotated7", "annotated8", "annotated9", "annotated10",
"annotated11");
"annotated11", "annotated12", "annotated13");

@Autowired
public IfaceListenerImpl ifaceListener;
Expand Down Expand Up @@ -212,6 +212,18 @@ public void testJson() throws Exception {
assertThat(this.listener.foo.getBar()).isEqualTo("bar");
}

@Test
public void testNulls() throws Exception {
template.send("annotated12", null, null);
assertThat(this.listener.latch8.await(60, TimeUnit.SECONDS)).isTrue();
}

@Test
public void testEmpty() throws Exception {
template.send("annotated13", null, "");
assertThat(this.listener.latch9.await(60, TimeUnit.SECONDS)).isTrue();
}

@Configuration
@EnableKafka
@EnableTransactionManagement(proxyTargetClass = true)
Expand Down Expand Up @@ -394,6 +406,10 @@ static class Listener {

private final CountDownLatch latch7 = new CountDownLatch(1);

private final CountDownLatch latch8 = new CountDownLatch(1);

private final CountDownLatch latch9 = new CountDownLatch(1);

private final CountDownLatch eventLatch = new CountDownLatch(1);

private volatile Integer partition;
Expand Down Expand Up @@ -473,6 +489,18 @@ public void listen7(String foo) {
this.latch7.countDown();
}

@KafkaListener(id = "quux", topics = "annotated12")
public void listen8(@Payload(required = false) String none) {
assertThat(none).isNull();
this.latch8.countDown();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add assertion isNull(none)

}

@KafkaListener(id = "corge", topics = "annotated13")
public void listen9(Object payload) {
assertThat(payload).isNotNull();
this.latch9.countDown();
}

}

interface IfaceListener<T> {
Expand Down