Skip to content

Commit b671667

Browse files
committed
consumer retries info#2
1 parent 759f2ae commit b671667

File tree

2 files changed

+19
-11
lines changed

2 files changed

+19
-11
lines changed

README.adoc

+14-7
Original file line numberDiff line numberDiff line change
@@ -775,8 +775,8 @@ Create topics:
775775

776776
[source,bash]
777777
----
778-
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic input-topic --replication-factor 1 --partitions 1
779778
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic retry-topic --replication-factor 1 --partitions 1
779+
docker exec broker kafka-topics --bootstrap-server broker:9092 --create --topic dlq-topic --replication-factor 1 --partitions 1
780780
----
781781

782782
Run consumer managing retry topics:
@@ -803,12 +803,19 @@ Verify in consumer log if messages are sent to retry and dlq topics:
803803

804804
[source,bash]
805805
----
806-
Group id eb35b080-6cc7-4934-9363-fcf1bcfcaef7 - Consumer id: consumer-eb35b080-6cc7-4934-9363-fcf1bcfcaef7-1-56d0ad79-8496-4410-a8c1-3519a7605baf - Topic: input-topic - Partition: 0 - Offset: 0 - Key: alice - Value: {col_foo:1}
807-
Group id eb35b080-6cc7-4934-9363-fcf1bcfcaef7 - Consumer id: consumer-eb35b080-6cc7-4934-9363-fcf1bcfcaef7-1-56d0ad79-8496-4410-a8c1-3519a7605baf - Topic: input-topic - Partition: 0 - Offset: 1 - Key: alice - Value: {col_foo:1}
808-
Error message detected: number of retries left 3
809-
Group id eb35b080-6cc7-4934-9363-fcf1bcfcaef7 - Consumer id: consumer-eb35b080-6cc7-4934-9363-fcf1bcfcaef7-1-56d0ad79-8496-4410-a8c1-3519a7605baf - Topic: input-topic - Partition: 0 - Offset: 2 - Key: alice - Value: {col_foo:1}
810-
Error message detected: number of retries left 3
811-
send to RETRY topic: retry_topic
806+
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 0 - Key: alice - Value: {col_foo:1}
807+
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 1 - Key: alice - Value: {col_foo:1}
808+
Error message detected: number of retries 3 left for key alice
809+
send to RETRY topic: retry-topic
810+
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 2 - Key: alice - Value: {col_foo:1}
811+
Error message detected: number of retries 2 left for key alice
812+
send to RETRY topic: retry-topic
813+
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 3 - Key: alice - Value: {col_foo:1}
814+
Error message detected: number of retries 1 left for key alice
815+
send to RETRY topic: retry-topic
816+
Group id c9a19a62-0284-4251-be22-5d691243646a - Consumer id: consumer-c9a19a62-0284-4251-be22-5d691243646a-1-86fb972e-b5c8-4621-8464-9c1a747a920b - Topic: input-topic - Partition: 0 - Offset: 4 - Key: alice - Value: {col_foo:1}
817+
Error message detected: number of retries 0 left for key alice
818+
number of retries exhausted, send to DLQ topic: dlq-topic
812819
----
813820

814821
=== Interceptor

kafka-consumer-retry-topics/src/main/java/org/hifly/kafka/demo/consumer/retry/RetryHandle.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@
1111

1212
import java.math.BigInteger;
1313
import java.util.*;
14+
import java.util.concurrent.ConcurrentHashMap;
1415

1516
public class RetryHandle<K,V> extends ConsumerHandle<K,V> {
1617

17-
private final String RETRY_TOPIC = "retry_topic";
18-
private final String DLQ_TOPIC = "dlq_topic";
18+
private final String RETRY_TOPIC = "retry-topic";
19+
private final String DLQ_TOPIC = "dlq-topic";
1920

20-
private Map<String, Integer> retriesPerKey = new HashMap<>();
21+
private Map<String, Integer> retriesPerKey = new ConcurrentHashMap<>();
2122

2223
private int retries;
2324

@@ -36,9 +37,9 @@ public void process(ConsumerRecords<K, V> consumerRecords, String groupId, Strin
3637
for (Header recordHeader : record.headers()) {
3738
//This is only a sample way to detect if a record should be a candidate for retries. Real use-case will fail on same business logic
3839
if (recordHeader.key().equals("ERROR")) {
39-
System.out.printf("Error message detected: number of retries left %s\n", retries);
4040
if (retriesPerKey.containsKey(record.key())) {
4141
Integer retryValue = retriesPerKey.get(record.key());
42+
System.out.printf("Error message detected: number of retries %s left for key %s\n", retryValue, record.key());
4243
if (retryValue > 0) {
4344
System.out.printf("send to RETRY topic: %s\n", RETRY_TOPIC);
4445
//move to retry topic

0 commit comments

Comments
 (0)