Skip to content

Commit df6e4b1

Browse files
garyrussellartembilan
authored andcommitted
GH-1749: Fix DLPR Send Timeout
Resolves #1749 We cannot timeout before the `KafkaProducer`; otherwise we could be reporting a timeout on a send that actually succeeds. Ensure that the send timeout is at least a few seconds longer than the producer timeout.
1 parent 7badf42 commit df6e4b1

File tree

3 files changed

+135
-7
lines changed

3 files changed

+135
-7
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@
4444
import org.springframework.core.log.LogAccessor;
4545
import org.springframework.kafka.KafkaException;
4646
import org.springframework.kafka.core.KafkaOperations;
47+
import org.springframework.kafka.core.ProducerFactory;
4748
import org.springframework.kafka.support.KafkaHeaders;
49+
import org.springframework.kafka.support.KafkaUtils;
4850
import org.springframework.kafka.support.SendResult;
4951
import org.springframework.kafka.support.serializer.DeserializationException;
5052
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
@@ -97,6 +99,7 @@ public class DeadLetterPublishingRecoverer implements ConsumerAwareRecordRecover
9799

98100
private boolean throwIfNoDestinationReturned = false;
99101

102+
private long timeoutBuffer = Duration.ofSeconds(FIVE).toMillis();
100103

101104
/**
102105
* Create an instance with the provided template and a default destination resolving
@@ -274,15 +277,28 @@ public void setFailIfSendResultIsError(boolean failIfSendResultIsError) {
274277
}
275278

276279
/**
277-
* Time to wait for message sending. Default is 30 seconds.
280+
* Set the minumum time to wait for message sending. Default is the producer
281+
* configuration {@code delivery.timeout.ms} plus the {@link #setTimeoutBuffer(long)}.
278282
* @param waitForSendResultTimeout the timeout.
279283
* @since 2.7
280284
* @see #setFailIfSendResultIsError(boolean)
285+
* @see #setTimeoutBuffer(long)
281286
*/
282287
public void setWaitForSendResultTimeout(Duration waitForSendResultTimeout) {
283288
this.waitForSendResultTimeout = waitForSendResultTimeout;
284289
}
285290

291+
/**
292+
* Set the number of milliseconds to add to the producer configuration {@code delivery.timeout.ms}
293+
* property to avoid timing out before the Kafka producer. Default 5000.
294+
* @param buffer the buffer.
295+
* @since 2.7
296+
* @see #setWaitForSendResultTimeout(Duration)
297+
*/
298+
public void setTimeoutBuffer(long buffer) {
299+
this.timeoutBuffer = buffer;
300+
}
301+
286302
@SuppressWarnings("unchecked")
287303
@Override
288304
public void accept(ConsumerRecord<?, ?> record, @Nullable Consumer<?, ?> consumer, Exception exception) {
@@ -447,17 +463,20 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
447463
this.logger.error(e, () -> "Dead-letter publication failed for: " + outRecord);
448464
}
449465
if (this.failIfSendResultIsError) {
450-
verifySendResult(outRecord, sendResult);
466+
verifySendResult(kafkaTemplate, outRecord, sendResult);
451467
}
452468
}
453469

454-
private void verifySendResult(ProducerRecord<Object, Object> outRecord,
455-
@Nullable ListenableFuture<SendResult<Object, Object>> sendResult) {
470+
private void verifySendResult(KafkaOperations<Object, Object> kafkaTemplate,
471+
ProducerRecord<Object, Object> outRecord,
472+
@Nullable ListenableFuture<SendResult<Object, Object>> sendResult) {
473+
474+
Duration sendTimeout = determineSendTimeout(kafkaTemplate);
456475
if (sendResult == null) {
457476
throw new KafkaException("Dead-letter publication failed for: " + outRecord);
458477
}
459478
try {
460-
sendResult.get(this.waitForSendResultTimeout.toMillis(), TimeUnit.MILLISECONDS);
479+
sendResult.get(sendTimeout.toMillis(), TimeUnit.MILLISECONDS);
461480
}
462481
catch (InterruptedException e) {
463482
Thread.currentThread().interrupt();
@@ -468,6 +487,18 @@ private void verifySendResult(ProducerRecord<Object, Object> outRecord,
468487
}
469488
}
470489

490+
private Duration determineSendTimeout(KafkaOperations<?, ?> template) {
491+
ProducerFactory<? extends Object, ? extends Object> producerFactory = template.getProducerFactory();
492+
if (producerFactory != null) { // NOSONAR - will only occur in mock tests
493+
Map<String, Object> props = producerFactory.getConfigurationProperties();
494+
if (props != null) { // NOSONAR - will only occur in mock tests
495+
return KafkaUtils.determineSendTimeout(props, this.timeoutBuffer,
496+
this.waitForSendResultTimeout.toMillis());
497+
}
498+
}
499+
return Duration.ofSeconds(THIRTY);
500+
}
501+
471502
private void enhanceHeaders(Headers kafkaHeaders, ConsumerRecord<?, ?> record, Exception exception) {
472503
maybeAddOriginalHeaders(kafkaHeaders, record);
473504
addExceptionInfoHeaders(kafkaHeaders, exception, false);

spring-kafka/src/main/java/org/springframework/kafka/support/KafkaUtils.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2020 the original author or authors.
2+
* Copyright 2018-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -19,7 +19,11 @@
1919
import java.lang.reflect.Method;
2020
import java.lang.reflect.ParameterizedType;
2121
import java.lang.reflect.Type;
22+
import java.time.Duration;
2223
import java.util.Collection;
24+
import java.util.Map;
25+
26+
import org.apache.kafka.clients.producer.ProducerConfig;
2327

2428
import org.springframework.messaging.Message;
2529
import org.springframework.util.ClassUtils;
@@ -98,6 +102,39 @@ public static void clearConsumerGroupId() {
98102
KafkaUtils.GROUP_IDS.remove();
99103
}
100104

105+
/**
106+
* Return the timeout to use when sending records. If the
107+
* {@link ProducerConfig#DELIVERY_TIMEOUT_MS_CONFIG} is not configured, or is not a
108+
* number or a String that can be parsed as a long, the {@link ProducerConfig} default
109+
* value (plus the buffer) is used.
110+
* @param producerProps the producer properties.
111+
* @param buffer a buffer to add to the configured
112+
* {@link ProducerConfig#DELIVERY_TIMEOUT_MS_CONFIG} to prevent timing out before the
113+
* Kafka producer.
114+
* @param min a minimum value to apply after adding the buffer to the configured
115+
* timeout.
116+
* @return the timeout to use.
117+
* @since 2.7
118+
*/
119+
public static Duration determineSendTimeout(Map<String, Object> producerProps, long buffer, long min) {
120+
Object dt = producerProps.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG);
121+
if (dt instanceof Number) {
122+
return Duration.ofMillis(Math.max(((Number) dt).longValue() + buffer, min));
123+
}
124+
else if (dt instanceof String) {
125+
try {
126+
return Duration.ofMillis(Math.max(Long.parseLong((String) dt) + buffer, min));
127+
}
128+
catch (@SuppressWarnings("unused") NumberFormatException ex) {
129+
}
130+
}
131+
return Duration.ofMillis(Math.max(
132+
((Integer) ProducerConfig.configDef().defaultValues()
133+
.get(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG)).longValue() + buffer,
134+
min));
135+
}
136+
137+
101138
private KafkaUtils() {
102139
}
103140

spring-kafka/src/test/java/org/springframework/kafka/listener/DeadLetterPublishingRecovererTests.java

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import static org.mockito.BDDMockito.willReturn;
2828
import static org.mockito.Mockito.mock;
2929
import static org.mockito.Mockito.never;
30+
import static org.mockito.Mockito.spy;
3031
import static org.mockito.Mockito.times;
3132
import static org.mockito.Mockito.verify;
3233

@@ -36,13 +37,15 @@
3637
import java.io.UncheckedIOException;
3738
import java.time.Duration;
3839
import java.util.Collections;
40+
import java.util.HashMap;
3941
import java.util.LinkedHashMap;
4042
import java.util.Map;
4143
import java.util.concurrent.TimeUnit;
4244
import java.util.concurrent.TimeoutException;
4345

4446
import org.apache.kafka.clients.consumer.Consumer;
4547
import org.apache.kafka.clients.consumer.ConsumerRecord;
48+
import org.apache.kafka.clients.producer.ProducerConfig;
4649
import org.apache.kafka.clients.producer.ProducerRecord;
4750
import org.apache.kafka.common.PartitionInfo;
4851
import org.apache.kafka.common.header.Header;
@@ -56,7 +59,9 @@
5659
import org.springframework.kafka.KafkaException;
5760
import org.springframework.kafka.core.KafkaOperations;
5861
import org.springframework.kafka.core.KafkaOperations.OperationsCallback;
62+
import org.springframework.kafka.core.ProducerFactory;
5963
import org.springframework.kafka.support.KafkaHeaders;
64+
import org.springframework.kafka.support.SendResult;
6065
import org.springframework.kafka.support.serializer.DeserializationException;
6166
import org.springframework.kafka.support.serializer.ErrorHandlingDeserializer;
6267
import org.springframework.util.concurrent.ListenableFuture;
@@ -357,10 +362,15 @@ void replaceOriginalHeaders() {
357362
assertThat(anotherHeaders.lastHeader(KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE)).isNotEqualTo(originalTimestampType);
358363
}
359364

360-
@SuppressWarnings("unchecked")
365+
@SuppressWarnings({ "unchecked", "rawtypes" })
361366
@Test
362367
void failIfSendResultIsError() throws Exception {
363368
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
369+
ProducerFactory pf = mock(ProducerFactory.class);
370+
Map<String, Object> props = new HashMap<>();
371+
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10L);
372+
given(pf.getConfigurationProperties()).willReturn(props);
373+
given(template.getProducerFactory()).willReturn(pf);
364374
ListenableFuture<?> future = mock(ListenableFuture.class);
365375
ArgumentCaptor<Long> timeoutCaptor = ArgumentCaptor.forClass(Long.class);
366376
given(template.send(any(ProducerRecord.class))).willReturn(future);
@@ -370,11 +380,61 @@ void failIfSendResultIsError() throws Exception {
370380
recoverer.setFailIfSendResultIsError(true);
371381
Duration waitForSendResultTimeout = Duration.ofSeconds(1);
372382
recoverer.setWaitForSendResultTimeout(waitForSendResultTimeout);
383+
recoverer.setTimeoutBuffer(0L);
373384
assertThatThrownBy(() -> recoverer.accept(record, new RuntimeException()))
374385
.isExactlyInstanceOf(KafkaException.class);
375386
assertThat(timeoutCaptor.getValue()).isEqualTo(waitForSendResultTimeout.toMillis());
376387
}
377388

389+
@SuppressWarnings({ "unchecked", "rawtypes" })
390+
@Test
391+
void sendTimeoutDefault() throws Exception {
392+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
393+
ProducerFactory pf = mock(ProducerFactory.class);
394+
Map<String, Object> props = new HashMap<>();
395+
given(pf.getConfigurationProperties()).willReturn(props);
396+
given(template.getProducerFactory()).willReturn(pf);
397+
SettableListenableFuture<SendResult> future = spy(new SettableListenableFuture<>());
398+
ArgumentCaptor<Long> timeoutCaptor = ArgumentCaptor.forClass(Long.class);
399+
given(template.send(any(ProducerRecord.class))).willReturn(future);
400+
willAnswer(inv -> {
401+
future.set(new SendResult(null, null));
402+
return null;
403+
}).given(future).get(timeoutCaptor.capture(), eq(TimeUnit.MILLISECONDS));
404+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
405+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
406+
recoverer.setFailIfSendResultIsError(true);
407+
Duration waitForSendResultTimeout = Duration.ofSeconds(1);
408+
recoverer.setWaitForSendResultTimeout(waitForSendResultTimeout);
409+
recoverer.accept(record, new RuntimeException());
410+
assertThat(timeoutCaptor.getValue()).isEqualTo(Duration.ofSeconds(125).toMillis());
411+
}
412+
413+
@SuppressWarnings({ "unchecked", "rawtypes" })
414+
@Test
415+
void sendTimeoutConfig() throws Exception {
416+
KafkaOperations<?, ?> template = mock(KafkaOperations.class);
417+
ProducerFactory pf = mock(ProducerFactory.class);
418+
Map<String, Object> props = new HashMap<>();
419+
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30_000L);
420+
given(pf.getConfigurationProperties()).willReturn(props);
421+
given(template.getProducerFactory()).willReturn(pf);
422+
SettableListenableFuture<SendResult> future = spy(new SettableListenableFuture<>());
423+
ArgumentCaptor<Long> timeoutCaptor = ArgumentCaptor.forClass(Long.class);
424+
given(template.send(any(ProducerRecord.class))).willReturn(future);
425+
willAnswer(inv -> {
426+
future.set(new SendResult(null, null));
427+
return null;
428+
}).given(future).get(timeoutCaptor.capture(), eq(TimeUnit.MILLISECONDS));
429+
ConsumerRecord<String, String> record = new ConsumerRecord<>("foo", 0, 0L, "bar", null);
430+
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
431+
recoverer.setFailIfSendResultIsError(true);
432+
Duration waitForSendResultTimeout = Duration.ofSeconds(1);
433+
recoverer.setWaitForSendResultTimeout(waitForSendResultTimeout);
434+
recoverer.accept(record, new RuntimeException());
435+
assertThat(timeoutCaptor.getValue()).isEqualTo(Duration.ofSeconds(35).toMillis());
436+
}
437+
378438
@SuppressWarnings("unchecked")
379439
@Test
380440
void notFailIfSendResultIsError() throws Exception {

0 commit comments

Comments
 (0)