-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
This is on main branch
Describe the bug
KafkaTemplate has a method:
@Override
@Nullable
public ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout) {
Properties props = oneOnly();
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
return receiveOne(topicPartition, offset, pollTimeout, consumer);
}
}
This method correctly states that the return value could be null via @nullable and the javadoc also states the same.
However there is another method:
@Override
public ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) {
Properties props = oneOnly();
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = new LinkedHashMap<>();
try (Consumer<K, V> consumer = this.consumerFactory.createConsumer(null, null, null, props)) {
requested.forEach(tpo -> {
ConsumerRecord<K, V> one = receiveOne(tpo.getTopicPartition(), tpo.getOffset(), pollTimeout, consumer);
records.computeIfAbsent(tpo.getTopicPartition(), tp -> new ArrayList<>()).add(one);
});
return new ConsumerRecords<>(records);
}
}
This method returns a ConsumerRecords object however the ConsumerRecord itself could be null. Therefore you end up with a collection with null inside it.
To Reproduce
I reproduced it by calling receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout) on a brand new topic that was empty. The javadoc doesn't mention the elements could be null.
Could we do a null check before add the ConsumerRecord to the ConsumerRecords object?
Furthermore tpo.getOffset() can also return null which leads to an auto unboxing issue.