Skip to content

Commit 7badf42

Browse files
authored
GH-1747: Add ConsumerAwareRecordInterceptor
Resolves #1747 Also add `Consumer` to the new 2.7 methods and batch interceptor. * Deprecate old 'intercept()`.
1 parent 5d40c80 commit 7badf42

File tree

10 files changed

+127
-37
lines changed

10 files changed

+127
-37
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1009,6 +1009,8 @@ Access to the `Consumer` object is provided.
10091009
IMPORTANT: The `Consumer` object is not thread-safe.
10101010
You must only invoke its methods on the thread that calls the listener.
10111011

1012+
IMPORTANT: You should not execute any `Consumer<?, ?>` methods that affect the consumer's positions and or committed offsets in your listener; the container needs to manage such information.
1013+
10121014
[[message-listener-container]]
10131015
===== Message Listener Containers
10141016

@@ -1024,8 +1026,12 @@ Starting with version 2.2.7, you can add a `RecordInterceptor` to the listener c
10241026
If the interceptor returns null, the listener is not called.
10251027
Starting with version 2.7, it has additional methods which are called after the listener exits (normally, or by throwing an exception).
10261028
Also, starting with version 2.7, there is now a `BatchInterceptor`, providing similar functionality for <<batch-listeners>>.
1029+
In addition, the `ConsumerAwareRecordInterceptor` (and `BatchInterceptor`) provide access to the `Consumer<?, ?>`.
1030+
This might be used, for example, to access the consumer metrics in the interceptor.
1031+
1032+
IMPORTANT: You should not execute any methods that affect the consumer's positions and or committed offsets in these interceptors; the container needs to manage such information.
10271033

1028-
Starting with version 2.3, the `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
1034+
The `CompositeRecordInterceptor` and `CompositeBatchInterceptor` can be used to invoke multiple interceptors.
10291035

10301036
By default, when using transactions, the interceptor is invoked after the transaction has started.
10311037
Starting with version 2.3.4, you can set the listener container's `interceptBeforeTx` property to invoke the interceptor before the transaction has started instead.

spring-kafka-docs/src/main/asciidoc/whats-new.adoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ Error handlers and after rollback processors that extend `FailedRecordProcessor`
2525
See See <<after-rollback>>, <<seek-to-current>>, and <<recovering-batch-eh>> for more information.
2626

2727
The `RecordInterceptor` now has additional methods called after the listener returns (normally, or by throwing an exception).
28+
It also has a sub-interface `ConsumerAwareRecordInterceptor`.
2829
In addition, there is now a `BatchInterceptor` for batch listeners.
2930
See <<message-listener-container>> for more information.
3031

spring-kafka/src/main/java/org/springframework/kafka/listener/BatchInterceptor.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import org.apache.kafka.clients.consumer.Consumer;
1920
import org.apache.kafka.clients.consumer.ConsumerRecords;
2021

2122
import org.springframework.lang.Nullable;
@@ -37,24 +38,27 @@ public interface BatchInterceptor<K, V> {
3738
* Perform some action on the records or return a different one. If null is returned
3839
* the records will be skipped. Invoked before the listener.
3940
* @param records the records.
41+
* @param consumer the consumer.
4042
* @return the records or null.
4143
*/
4244
@Nullable
43-
ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records);
45+
ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer);
4446

4547
/**
4648
* Called after the listener exits normally.
4749
* @param records the records.
50+
* @param consumer the consumer.
4851
*/
49-
default void success(ConsumerRecords<K, V> records) {
52+
default void success(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
5053
}
5154

5255
/**
5356
* Called after the listener throws an exception.
5457
* @param records the records.
5558
* @param exception the exception.
59+
* @param consumer the consumer.
5660
*/
57-
default void failure(ConsumerRecords<K, V> records, Exception exception) {
61+
default void failure(ConsumerRecords<K, V> records, Exception exception, Consumer<K, V> consumer) {
5862
}
5963

6064
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeBatchInterceptor.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2019 the original author or authors.
2+
* Copyright 2019-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
2020
import java.util.Arrays;
2121
import java.util.Collection;
2222

23+
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecords;
2425

2526
import org.springframework.util.Assert;
@@ -52,22 +53,25 @@ public CompositeBatchInterceptor(BatchInterceptor<K, V>... delegates) {
5253
}
5354

5455
@Override
55-
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records) {
56+
public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
5657
ConsumerRecords<K, V> recordsToIntercept = records;
5758
for (BatchInterceptor<K, V> delegate : this.delegates) {
58-
recordsToIntercept = delegate.intercept(recordsToIntercept);
59+
recordsToIntercept = delegate.intercept(recordsToIntercept, consumer);
60+
if (recordsToIntercept == null) {
61+
break;
62+
}
5963
}
6064
return recordsToIntercept;
6165
}
6266

6367
@Override
64-
public void success(ConsumerRecords<K, V> records) {
65-
this.delegates.forEach(del -> del.success(records));
68+
public void success(ConsumerRecords<K, V> records, Consumer<K, V> consumer) {
69+
this.delegates.forEach(del -> del.success(records, consumer));
6670
}
6771

6872
@Override
69-
public void failure(ConsumerRecords<K, V> records, Exception exception) {
70-
this.delegates.forEach(del -> del.failure(records, exception));
73+
public void failure(ConsumerRecords<K, V> records, Exception exception, Consumer<K, V> consumer) {
74+
this.delegates.forEach(del -> del.failure(records, exception, consumer));
7175
}
7276

7377
}

spring-kafka/src/main/java/org/springframework/kafka/listener/CompositeRecordInterceptor.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
import java.util.Arrays;
2121
import java.util.Collection;
2222

23+
import org.apache.kafka.clients.consumer.Consumer;
2324
import org.apache.kafka.clients.consumer.ConsumerRecord;
2425

26+
import org.springframework.lang.Nullable;
2527
import org.springframework.util.Assert;
2628

2729
/**
@@ -36,7 +38,7 @@
3638
* @since 2.3
3739
*
3840
*/
39-
public class CompositeRecordInterceptor<K, V> implements RecordInterceptor<K, V> {
41+
public class CompositeRecordInterceptor<K, V> implements ConsumerAwareRecordInterceptor<K, V> {
4042

4143
private final Collection<RecordInterceptor<K, V>> delegates = new ArrayList<>();
4244

@@ -53,22 +55,26 @@ public CompositeRecordInterceptor(RecordInterceptor<K, V>... delegates) {
5355
}
5456

5557
@Override
56-
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record) {
58+
@Nullable
59+
public ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
5760
ConsumerRecord<K, V> recordToIntercept = record;
5861
for (RecordInterceptor<K, V> delegate : this.delegates) {
59-
recordToIntercept = delegate.intercept(recordToIntercept);
62+
recordToIntercept = delegate.intercept(recordToIntercept, consumer);
63+
if (recordToIntercept == null) {
64+
break;
65+
}
6066
}
6167
return recordToIntercept;
6268
}
6369

6470
@Override
65-
public void success(ConsumerRecord<K, V> record) {
66-
this.delegates.forEach(del -> del.success(record));
71+
public void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
72+
this.delegates.forEach(del -> del.success(record, consumer));
6773
}
6874

6975
@Override
70-
public void failure(ConsumerRecord<K, V> record, Exception exception) {
71-
this.delegates.forEach(del -> del.failure(record, exception));
76+
public void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
77+
this.delegates.forEach(del -> del.failure(record, exception, consumer));
7278
}
7379

7480
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2021 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.kafka.clients.consumer.Consumer;
20+
import org.apache.kafka.clients.consumer.ConsumerRecord;
21+
22+
import org.springframework.lang.Nullable;
23+
24+
/**
25+
* A {@link RecordInterceptor} that has access to the {@link Consumer}.
26+
*
27+
* @param <K> the key type.
28+
* @param <V> the value type.
29+
*
30+
* @author Gary Russell
31+
* @since 2.7
32+
*
33+
*/
34+
@FunctionalInterface
35+
public interface ConsumerAwareRecordInterceptor<K, V> extends RecordInterceptor<K, V> {
36+
37+
@SuppressWarnings("deprecation")
38+
@Override
39+
@Nullable
40+
default ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record) {
41+
throw new UnsupportedOperationException("Container should never call this");
42+
}
43+
44+
@Override
45+
@Nullable
46+
ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record, Consumer<K, V> consumer);
47+
48+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1789,7 +1789,7 @@ protected void doInTransactionWithoutResult(TransactionStatus status) {
17891789
}
17901790

17911791
private void batchAfterRollback(final ConsumerRecords<K, V> records,
1792-
final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
1792+
@Nullable final List<ConsumerRecord<K, V>> recordList, RuntimeException e,
17931793
AfterRollbackProcessor<K, V> afterRollbackProcessorToUse) {
17941794

17951795
RuntimeException rollbackException = decorateException(e);
@@ -1888,10 +1888,10 @@ private void batchInterceptAfter(ConsumerRecords<K, V> records, @Nullable Except
18881888
if (this.commonBatchInterceptor != null) {
18891889
try {
18901890
if (exception == null) {
1891-
this.commonBatchInterceptor.success(records);
1891+
this.commonBatchInterceptor.success(records, this.consumer);
18921892
}
18931893
else {
1894-
this.commonBatchInterceptor.failure(records, exception);
1894+
this.commonBatchInterceptor.failure(records, exception, this.consumer);
18951895
}
18961896
}
18971897
catch (Exception e) {
@@ -1961,7 +1961,7 @@ private void invokeBatchOnMessageWithRecordsOrList(final ConsumerRecords<K, V> r
19611961

19621962
ConsumerRecords<K, V> records = recordsArg;
19631963
if (this.batchInterceptor != null) {
1964-
records = this.batchInterceptor.intercept(recordsArg);
1964+
records = this.batchInterceptor.intercept(recordsArg, this.consumer);
19651965
}
19661966
if (this.wantsFullRecords) {
19671967
this.batchListener.onMessage(records, // NOSONAR
@@ -2143,7 +2143,7 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
21432143
private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg) {
21442144
ConsumerRecords<K, V> next = nextArg;
21452145
if (this.earlyBatchInterceptor != null) {
2146-
next = this.earlyBatchInterceptor.intercept(next);
2146+
next = this.earlyBatchInterceptor.intercept(next, this.consumer);
21472147
if (next == null) {
21482148
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
21492149
+ nextArg + " with " + nextArg.count() + " records");
@@ -2156,7 +2156,7 @@ private ConsumerRecords<K, V> checkEarlyIntercept(ConsumerRecords<K, V> nextArg)
21562156
private ConsumerRecord<K, V> checkEarlyIntercept(ConsumerRecord<K, V> nextArg) {
21572157
ConsumerRecord<K, V> next = nextArg;
21582158
if (this.earlyRecordInterceptor != null) {
2159-
next = this.earlyRecordInterceptor.intercept(next);
2159+
next = this.earlyRecordInterceptor.intercept(next, this.consumer);
21602160
if (next == null) {
21612161
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "
21622162
+ ListenerUtils.recordToString(nextArg));
@@ -2258,10 +2258,10 @@ private void recordInterceptAfter(ConsumerRecord<K, V> records, @Nullable Except
22582258
if (this.commonRecordInterceptor != null) {
22592259
try {
22602260
if (exception == null) {
2261-
this.commonRecordInterceptor.success(records);
2261+
this.commonRecordInterceptor.success(records, this.consumer);
22622262
}
22632263
else {
2264-
this.commonRecordInterceptor.failure(records, exception);
2264+
this.commonRecordInterceptor.failure(records, exception, this.consumer);
22652265
}
22662266
}
22672267
catch (Exception e) {
@@ -2301,7 +2301,7 @@ private void invokeOnMessage(final ConsumerRecord<K, V> record) {
23012301
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
23022302
ConsumerRecord<K, V> record = recordArg;
23032303
if (this.recordInterceptor != null) {
2304-
record = this.recordInterceptor.intercept(record);
2304+
record = this.recordInterceptor.intercept(record, this.consumer);
23052305
}
23062306
if (record == null) {
23072307
this.logger.debug(() -> "RecordInterceptor returned null, skipping: "

spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import org.apache.kafka.clients.consumer.Consumer;
1920
import org.apache.kafka.clients.consumer.ConsumerRecord;
2021

2122
import org.springframework.lang.Nullable;
@@ -39,25 +40,45 @@ public interface RecordInterceptor<K, V> {
3940
* the record will be skipped. Invoked before the listener.
4041
* @param record the record.
4142
* @return the record or null.
43+
* @deprecated in favor of {@link #intercept(ConsumerRecord, Consumer)} which will
44+
* become the required method in a future release.
4245
*/
46+
@Deprecated
4347
@Nullable
4448
ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);
4549

50+
/**
51+
* Perform some action on the record or return a different one. If null is returned
52+
* the record will be skipped. Invoked before the listener.
53+
* @param record the record.
54+
* @param consumer the consumer.
55+
* @return the record or null.
56+
* @since 2.7
57+
*/
58+
@Nullable
59+
default ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record,
60+
@SuppressWarnings("unused") Consumer<K, V> consumer) {
61+
62+
return intercept(record);
63+
}
64+
4665
/**
4766
* Called after the listener exits normally.
4867
* @param record the record.
68+
* @param consumer the consumer.
4969
* @since 2.7
5070
*/
51-
default void success(ConsumerRecord<K, V> record) {
71+
default void success(ConsumerRecord<K, V> record, Consumer<K, V> consumer) {
5272
}
5373

5474
/**
5575
* Called after the listener throws an exception.
5676
* @param record the record.
5777
* @param exception the exception.
78+
* @param consumer the consumer.
5879
* @since 2.7
5980
*/
60-
default void failure(ConsumerRecord<K, V> record, Exception exception) {
81+
default void failure(ConsumerRecord<K, V> record, Exception exception, Consumer<K, V> consumer) {
6182
}
6283

6384
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1084,7 +1084,7 @@ public KafkaListenerContainerFactory<?> batchFactory() {
10841084
factory.setRecordFilterStrategy(recordFilter());
10851085
// always send to the same partition so the replies are in order for the test
10861086
factory.setReplyTemplate(partitionZeroReplyTemplate());
1087-
factory.setBatchInterceptor(records -> {
1087+
factory.setBatchInterceptor((records, consumer) -> {
10881088
this.batchIntercepted = true;
10891089
return records;
10901090
});

0 commit comments

Comments
 (0)