diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 1c03cc77d3..fa11a8504c 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -4663,6 +4663,7 @@ TIP: See <>. This section describes how to handle various exceptions that may arise when you use Spring for Apache Kafka. +[[listener-error-handlers]] ===== Listener Error Handlers Starting with version 2.0, the `@KafkaListener` annotation has a new attribute: `errorHandler`. @@ -4686,6 +4687,26 @@ You have access to the spring-messaging `Message` object produced by the mess The error handler can throw the original or a new exception, which is thrown to the container. Anything returned by the error handler is ignored. +Starting with version 2.7, you can set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message` in the `KafkaHeaders.RAW_DATA` header. +This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler. +It might be used in a request/reply scenario where you wish to send a failure result to the sender, after some number of retries, after capturing the failed record in a dead letter topic. + +==== +[source, java] +---- +@Bean +KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) { + return (msg, ex) -> { + if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) { + recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex); + return "FAILED"; + } + throw ex; + }; +} +---- +==== + It has a sub-interface (`ConsumerAwareListenerErrorHandler`) that has access to the consumer object, through the following method: ==== diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index 8c8d08874f..b69e1eb4cf 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -35,6 +35,10 @@ See <> for more information. You can now validate the payload parameter of `@KafkaHandler` methods (class-level listeners). See <> for more information. +You can now set the `rawRecordHeader` property on the `MessagingMessageConverter` and `BatchMessagingMessageConverter` which causes the raw `ConsumerRecord` to be added to the converted `Message`. +This is useful, for example, if you wish to use a `DeadLetterPublishingRecoverer` in a listener error handler. +See <> for more information. + [[x27-dlt]] ==== `DeadLetterPublishingRecover` Changes diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index d7df9a11b2..ee792dff2b 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -71,6 +71,8 @@ public class BatchMessagingMessageConverter implements BatchMessageConverter { private KafkaHeaderMapper headerMapper; + private boolean rawRecordHeader; + /** * Create an instance that does not convert the record values. */ @@ -123,6 +125,16 @@ public RecordMessageConverter getRecordMessageConverter() { return this.recordConverter; } + /** + * Set to true to add the raw {@code List>} as a header + * {@link KafkaHeaders#RAW_DATA}. + * @param rawRecordHeader true to add the header. + * @since 2.7 + */ + public void setRawRecordHeader(boolean rawRecordHeader) { + this.rawRecordHeader = rawRecordHeader; + } + @Override public Message toMessage(List> records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Type type) { @@ -140,12 +152,16 @@ public Message toMessage(List> records, @Nullable Acknow List timestamps = new ArrayList<>(); List> convertedHeaders = new ArrayList<>(); List natives = new ArrayList<>(); + List> raws = new ArrayList<>(); if (this.headerMapper != null) { rawHeaders.put(KafkaHeaders.BATCH_CONVERTED_HEADERS, convertedHeaders); } else { rawHeaders.put(KafkaHeaders.NATIVE_HEADERS, natives); } + if (this.rawRecordHeader) { + rawHeaders.put(KafkaHeaders.RAW_DATA, raws); + } commonHeaders(acknowledgment, consumer, rawHeaders, keys, topics, partitions, offsets, timestampTypes, timestamps); @@ -177,6 +193,9 @@ public Message toMessage(List> records, @Nullable Acknow } natives.add(record.headers()); } + if (this.rawRecordHeader) { + raws.add(record); + } } return MessageBuilder.createMessage(payloads, kafkaMessageHeaders); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java index a12fd4a121..865b3d1026 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/MessagingMessageConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2019 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -62,6 +62,8 @@ public class MessagingMessageConverter implements RecordMessageConverter { private KafkaHeaderMapper headerMapper; + private boolean rawRecordHeader; + public MessagingMessageConverter() { if (JacksonPresent.isJackson2Present()) { this.headerMapper = new DefaultKafkaHeaderMapper(); @@ -98,6 +100,16 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) { this.headerMapper = headerMapper; } + /** + * Set to true to add the raw {@link ConsumerRecord} as a header + * {@link KafkaHeaders#RAW_DATA}. + * @param rawRecordHeader true to add the header. + * @since 2.7 + */ + public void setRawRecordHeader(boolean rawRecordHeader) { + this.rawRecordHeader = rawRecordHeader; + } + @Override public Message toMessage(ConsumerRecord record, Acknowledgment acknowledgment, Consumer consumer, Type type) { @@ -119,7 +131,9 @@ public Message toMessage(ConsumerRecord record, Acknowledgment acknowle String ttName = record.timestampType() != null ? record.timestampType().name() : null; commonHeaders(acknowledgment, consumer, rawHeaders, record.key(), record.topic(), record.partition(), record.offset(), ttName, record.timestamp()); - + if (this.rawRecordHeader) { + rawHeaders.put(KafkaHeaders.RAW_DATA, record); + } return MessageBuilder.createMessage(extractAndConvertValue(record, type), kafkaMessageHeaders); } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java index 8bb5ef1b09..3ad67d188a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/BatchMessageConverterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2019 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,13 +44,14 @@ /** * @author Biju Kunjummen * @author Artem Bilan + * @author Gary Russell * * @since 1.3 */ public class BatchMessageConverterTests { @Test - public void testBatchConverters() { + void testBatchConverters() { BatchMessageConverter batchMessageConverter = new BatchMessagingMessageConverter(); MessageHeaders headers = testGuts(batchMessageConverter); @@ -61,10 +62,11 @@ public void testBatchConverters() { Map map = converted.get(0); assertThat(map).hasSize(1); assertThat(new String((byte[]) map.get("foo"))).isEqualTo("bar"); + assertThat(headers.get(KafkaHeaders.RAW_DATA)).isNull(); } @Test - public void testNoMapper() { + void testNoMapper() { BatchMessagingMessageConverter batchMessageConverter = new BatchMessagingMessageConverter(); batchMessageConverter.setHeaderMapper(null); @@ -79,16 +81,26 @@ public void testNoMapper() { assertThat(new String(next.value())).isEqualTo("bar"); } + @Test + void raw() { + BatchMessagingMessageConverter batchMessageConverter = new BatchMessagingMessageConverter(); + batchMessageConverter.setRawRecordHeader(true); + MessageHeaders headers = testGuts(batchMessageConverter); + @SuppressWarnings("unchecked") + List> converted = (List>) headers + .get(KafkaHeaders.BATCH_CONVERTED_HEADERS); + assertThat(converted).hasSize(3); + Map map = converted.get(0); + assertThat(map).hasSize(1); + assertThat(new String((byte[]) map.get("foo"))).isEqualTo("bar"); + @SuppressWarnings("unchecked") + List> rawHeader = headers.get(KafkaHeaders.RAW_DATA, List.class); + assertThat(rawHeader).extracting(rec -> (String) rec.value()) + .containsExactly("value1", "value2", "value3"); + } + private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { - Header header = new RecordHeader("foo", "bar".getBytes()); - Headers kHeaders = new RecordHeaders(new Header[] { header }); - List> consumerRecords = new ArrayList<>(); - consumerRecords.add(new ConsumerRecord<>("topic1", 0, 1, 1487694048607L, - TimestampType.CREATE_TIME, 123L, 2, 3, "key1", "value1", kHeaders)); - consumerRecords.add(new ConsumerRecord<>("topic1", 0, 2, 1487694048608L, - TimestampType.CREATE_TIME, 123L, 2, 3, "key2", "value2", kHeaders)); - consumerRecords.add(new ConsumerRecord<>("topic1", 0, 3, 1487694048609L, - TimestampType.CREATE_TIME, 123L, 2, 3, "key3", "value3", kHeaders)); + List> consumerRecords = recordList(); Acknowledgment ack = mock(Acknowledgment.class); @@ -118,6 +130,19 @@ private MessageHeaders testGuts(BatchMessageConverter batchMessageConverter) { return headers; } + private List> recordList() { + Header header = new RecordHeader("foo", "bar".getBytes()); + Headers kHeaders = new RecordHeaders(new Header[] { header }); + List> consumerRecords = new ArrayList<>(); + consumerRecords.add(new ConsumerRecord<>("topic1", 0, 1, 1487694048607L, + TimestampType.CREATE_TIME, 123L, 2, 3, "key1", "value1", kHeaders)); + consumerRecords.add(new ConsumerRecord<>("topic1", 0, 2, 1487694048608L, + TimestampType.CREATE_TIME, 123L, 2, 3, "key2", "value2", kHeaders)); + consumerRecords.add(new ConsumerRecord<>("topic1", 0, 3, 1487694048609L, + TimestampType.CREATE_TIME, 123L, 2, 3, "key3", "value3", kHeaders)); + return consumerRecords; + } + @SuppressWarnings("unchecked") @Test public void missingHeaders() { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java index e80dc4a71d..3eabdad16a 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/support/converter/MessagingMessageConverterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,6 +41,7 @@ void missingHeaders() { assertThat(message.getPayload()).isEqualTo("baz"); assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo"); assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar"); + assertThat(message.getHeaders().get(KafkaHeaders.RAW_DATA)).isNull(); } @Test @@ -53,4 +54,16 @@ void dontMapNullKey() { assertThat(message.getHeaders().containsKey(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isFalse(); } + @Test + void raw() { + MessagingMessageConverter converter = new MessagingMessageConverter(); + converter.setRawRecordHeader(true); + ConsumerRecord record = new ConsumerRecord<>("foo", 1, 42, -1L, null, 0L, 0, 0, "bar", "baz"); + Message message = converter.toMessage(record, null, null, null); + assertThat(message.getPayload()).isEqualTo("baz"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC)).isEqualTo("foo"); + assertThat(message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY)).isEqualTo("bar"); + assertThat(message.getHeaders().get(KafkaHeaders.RAW_DATA)).isSameAs(record); + } + }