Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -4663,6 +4663,7 @@ TIP: See <<tip-assign-all-parts>>.

This section describes how to handle various exceptions that may arise when you use Spring for Apache Kafka.

[[listener-error-handlers]]
===== Listener Error Handlers

Starting with version 2.0, the `@KafkaListener` annotation has a new attribute: `errorHandler`.
Expand All @@ -4686,6 +4687,26 @@ You have access to the spring-messaging `Message<?>` object produced by the mess
The error handler can throw the original or a new exception, which is thrown to the container.
Anything returned by the error handler is ignored.

Starting with version 2.7, you can set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message<?>` in the `KafkaHeaders.RAW_DATA` header.
This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler.
It might be used in a request/reply scenario where you wish to send a failure result to the sender, after some number of retries, after capturing the failed record in a dead letter topic.

====
[source, java]
----
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
----
====

It has a sub-interface (`ConsumerAwareListenerErrorHandler`) that has access to the consumer object, through the following method:

====
Expand Down
4 changes: 4 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ See <<message-listener-container>> for more information.
You can now validate the payload parameter of `@KafkaHandler` methods (class-level listeners).
See <<kafka-validation>> for more information.

You can now set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message<?>`.
This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler.
See <<listener-error-handlers>> for more information.

[[x27-dlt]]
==== `DeadLetterPublishingRecover` Changes

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,8 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter {

private KafkaHeaderMapper headerMapper;

private boolean rawRecordHeader;

/**
* Create an instance that does not convert the record values.
*/
Expand Down Expand Up @@ -123,6 +125,16 @@ public RecordMessageConverter getRecordMessageConverter() {
return this.recordConverter;
}

/**
* Set to true to add the raw {@code List<ConsumerRecord<?, ?>>} as a header
* {@link KafkaHeaders#RAW_DATA}.
* @param rawRecordHeader true to add the header.
* @since 2.7
*/
public void setRawRecordHeader(boolean rawRecordHeader) {
this.rawRecordHeader = rawRecordHeader;
}

@Override
public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, Type type) {
Expand All @@ -140,12 +152,16 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
List<Long> timestamps = new ArrayList<>();
List<Map<String, Object>> convertedHeaders = new ArrayList<>();
List<Headers> natives = new ArrayList<>();
List<ConsumerRecord<?, ?>> raws = new ArrayList<>();
if (this.headerMapper != null) {
rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders);
}
else {
rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, natives);
}
if (this.rawRecordHeader) {
rawHeaders.put(KafkaHeaders.RAW_DATA, raws);
}
commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes,
timestamps);

Expand Down Expand Up @@ -177,6 +193,9 @@ public Message<?> toMessage(List<ConsumerRecord<?, ?>> records, @Nullable Acknow
}
natives.add(record.headers());
}
if (this.rawRecordHeader) {
raws.add(record);
}
}
return MessageBuilder.createMessage(payloads, kafkaMessageHeaders);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,6 +62,8 @@ public class MessagingMessageConverter implements RecordMessageConverter {

private KafkaHeaderMapper headerMapper;

private boolean rawRecordHeader;

public MessagingMessageConverter() {
if (JacksonPresent.isJackson2Present()) {
this.headerMapper = new DefaultKafkaHeaderMapper();
Expand Down Expand Up @@ -98,6 +100,16 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) {
this.headerMapper = headerMapper;
}

/**
* Set to true to add the raw {@link ConsumerRecord} as a header
* {@link KafkaHeaders#RAW_DATA}.
* @param rawRecordHeader true to add the header.
* @since 2.7
*/
public void setRawRecordHeader(boolean rawRecordHeader) {
this.rawRecordHeader = rawRecordHeader;
}

@Override
public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Type type) {
Expand All @@ -119,7 +131,9 @@ public Message<?> toMessage(ConsumerRecord<?, ?> record, Acknowledgment acknowle
String ttName = record.timestampType() != null ? record.timestampType().name() : null;
commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(),
record.offset(), ttName, record.timestamp());

if (this.rawRecordHeader) {
rawHeaders.put(KafkaHeaders.RAW_DATA, record);
}
return MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,13 +44,14 @@
/**
* @author Biju Kunjummen
* @author Artem Bilan
* @author Gary Russell
*
* @since 1.3
*/
public class BatchMessageConverterTests {

@Test
public void testBatchConverters() {
void testBatchConverters() {
BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();

MessageHeaders headers = testGuts(batchMessageConverter);
Expand All @@ -61,10 +62,11 @@ public void testBatchConverters() {
Map<String, Object> map = converted.get(0);
assertThat(map).hasSize(1);
assertThat(new String((byte[]) map.get("foo"))).isEqualTo("bar");
assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNull();
}

@Test
public void testNoMapper() {
void testNoMapper() {
BatchMessagingMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
batchMessageConverter.setHeaderMapper(null);

Expand All @@ -79,16 +81,26 @@ public void testNoMapper() {
assertThat(new String(next.value())).isEqualTo("bar");
}

@Test
void raw() {
BatchMessagingMessageConverter batchMessageConverter = new BatchMessagingMessageConverter();
batchMessageConverter.setRawRecordHeader(true);
MessageHeaders headers = testGuts(batchMessageConverter);
@SuppressWarnings("unchecked")
List<Map<String, Object>> converted = (List<Map<String, Object>>) headers
.get(KafkaHeaders.BATCH_CONVERTED_HEADERS);
assertThat(converted).hasSize(3);
Map<String, Object> map = converted.get(0);
assertThat(map).hasSize(1);
assertThat(new String((byte[]) map.get("foo"))).isEqualTo("bar");
@SuppressWarnings("unchecked")
List<ConsumerRecord<?, ?>> rawHeader = headers.get(KafkaHeaders.RAW_DATA, List.class);
assertThat(rawHeader).extracting(rec -> (String) rec.value())
.containsExactly("value1", "value2", "value3");
}

private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) {
Header header = new RecordHeader("foo", "bar".getBytes());
Headers kHeaders = new RecordHeaders(new Header[] { header });
List<ConsumerRecord<?, ?>> consumerRecords = new ArrayList<>();
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 1, 1487694048607L,
TimestampType.CREATE_TIME, 123L, 2, 3, "key1", "value1", kHeaders));
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 2, 1487694048608L,
TimestampType.CREATE_TIME, 123L, 2, 3, "key2", "value2", kHeaders));
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 3, 1487694048609L,
TimestampType.CREATE_TIME, 123L, 2, 3, "key3", "value3", kHeaders));
List<ConsumerRecord<?, ?>> consumerRecords = recordList();


Acknowledgment ack = mock(Acknowledgment.class);
Expand Down Expand Up @@ -118,6 +130,19 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) {
return headers;
}

private List<ConsumerRecord<?, ?>> recordList() {
Header header = new RecordHeader("foo", "bar".getBytes());
Headers kHeaders = new RecordHeaders(new Header[] { header });
List<ConsumerRecord<?, ?>> consumerRecords = new ArrayList<>();
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 1, 1487694048607L,
TimestampType.CREATE_TIME, 123L, 2, 3, "key1", "value1", kHeaders));
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 2, 1487694048608L,
TimestampType.CREATE_TIME, 123L, 2, 3, "key2", "value2", kHeaders));
consumerRecords.add(new ConsumerRecord<>("topic1", 0, 3, 1487694048609L,
TimestampType.CREATE_TIME, 123L, 2, 3, "key3", "value3", kHeaders));
return consumerRecords;
}

@SuppressWarnings("unchecked")
@Test
public void missingHeaders() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 the original author or authors.
* Copyright 2019-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,7 @@ void missingHeaders() {
assertThat(message.getPayload()).isEqualTo("baz");
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo");
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar");
assertThat(message.getHeaders().get(KafkaHeaders.RAW_DATA)).isNull();
}

@Test
Expand All @@ -53,4 +54,16 @@ void dontMapNullKey() {
assertThat(message.getHeaders().containsKey(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isFalse();
}

@Test
void raw() {
MessagingMessageConverter converter = new MessagingMessageConverter();
converter.setRawRecordHeader(true);
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz");
Message<?> message = converter.toMessage(record, null, null, null);
assertThat(message.getPayload()).isEqualTo("baz");
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo");
assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar");
assertThat(message.getHeaders().get(KafkaHeaders.RAW_DATA)).isSameAs(record);
}

}