@@ -203,7 +203,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
203
203
204
204
private volatile CompletableFuture <Producer <byte []>> deadLetterProducer ;
205
205
206
- private volatile Producer <byte []> retryLetterProducer ;
206
+ private volatile CompletableFuture < Producer <byte []> > retryLetterProducer ;
207
207
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock ();
208
208
209
209
protected volatile boolean paused ;
@@ -643,6 +643,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
643
643
Map <String , String > customProperties ,
644
644
long delayTime ,
645
645
TimeUnit unit ) {
646
+
646
647
MessageId messageId = message .getMessageId ();
647
648
if (messageId == null ) {
648
649
return FutureUtil .failedFuture (new PulsarClientException
@@ -659,29 +660,8 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
659
660
}
660
661
return FutureUtil .failedFuture (exception );
661
662
}
662
- if (delayTime < 0 ) {
663
- delayTime = 0 ;
664
- }
665
- if (retryLetterProducer == null ) {
666
- createProducerLock .writeLock ().lock ();
667
- try {
668
- if (retryLetterProducer == null ) {
669
- retryLetterProducer = client .newProducer (Schema .AUTO_PRODUCE_BYTES (schema ))
670
- .topic (this .deadLetterPolicy .getRetryLetterTopic ())
671
- .enableBatching (false )
672
- .enableChunking (true )
673
- .blockIfQueueFull (false )
674
- .create ();
675
- stats .setRetryLetterProducerStats (retryLetterProducer .getStats ());
676
- }
677
- } catch (Exception e ) {
678
- log .error ("Create retry letter producer exception with topic: {}" ,
679
- deadLetterPolicy .getRetryLetterTopic (), e );
680
- return FutureUtil .failedFuture (e );
681
- } finally {
682
- createProducerLock .writeLock ().unlock ();
683
- }
684
- }
663
+
664
+ initRetryLetterProducerIfNeeded ();
685
665
CompletableFuture <Void > result = new CompletableFuture <>();
686
666
if (retryLetterProducer != null ) {
687
667
try {
@@ -701,7 +681,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
701
681
}
702
682
propertiesMap .put (RetryMessageUtil .SYSTEM_PROPERTY_RECONSUMETIMES , String .valueOf (reconsumeTimes ));
703
683
propertiesMap .put (RetryMessageUtil .SYSTEM_PROPERTY_DELAY_TIME ,
704
- String .valueOf (unit .toMillis (delayTime )));
684
+ String .valueOf (unit .toMillis (delayTime < 0 ? 0 : delayTime )));
705
685
706
686
MessageId finalMessageId = messageId ;
707
687
if (reconsumeTimes > this .deadLetterPolicy .getMaxRedeliverCount ()
@@ -732,23 +712,29 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
732
712
});
733
713
} else {
734
714
assert retryMessage != null ;
735
- TypedMessageBuilder <byte []> typedMessageBuilderNew = retryLetterProducer
736
- .newMessage (Schema .AUTO_PRODUCE_BYTES (message .getReaderSchema ().get ()))
737
- .value (retryMessage .getData ())
738
- .properties (propertiesMap );
739
- if (delayTime > 0 ) {
740
- typedMessageBuilderNew .deliverAfter (delayTime , unit );
741
- }
742
- if (message .hasKey ()) {
743
- typedMessageBuilderNew .key (message .getKey ());
744
- }
745
- typedMessageBuilderNew .sendAsync ()
746
- .thenCompose (__ -> doAcknowledge (finalMessageId , ackType , Collections .emptyMap (), null ))
747
- .thenAccept (v -> result .complete (null ))
748
- .exceptionally (ex -> {
749
- result .completeExceptionally (ex );
750
- return null ;
751
- });
715
+ retryLetterProducer .thenAcceptAsync (rtlProducer -> {
716
+ TypedMessageBuilder <byte []> typedMessageBuilderNew = rtlProducer
717
+ .newMessage (Schema .AUTO_PRODUCE_BYTES (message .getReaderSchema ().get ()))
718
+ .value (retryMessage .getData ())
719
+ .properties (propertiesMap );
720
+ if (delayTime > 0 ) {
721
+ typedMessageBuilderNew .deliverAfter (delayTime , unit );
722
+ }
723
+ if (message .hasKey ()) {
724
+ typedMessageBuilderNew .key (message .getKey ());
725
+ }
726
+ typedMessageBuilderNew .sendAsync ()
727
+ .thenCompose (__ -> doAcknowledge (finalMessageId , ackType , Collections .emptyMap (), null ))
728
+ .thenAccept (v -> result .complete (null ))
729
+ .exceptionally (ex -> {
730
+ result .completeExceptionally (ex );
731
+ return null ;
732
+ });
733
+ }, internalPinnedExecutor ).exceptionally (ex -> {
734
+ result .completeExceptionally (ex );
735
+ retryLetterProducer = null ;
736
+ return null ;
737
+ });
752
738
}
753
739
} catch (Exception e ) {
754
740
result .completeExceptionally (e );
@@ -757,7 +743,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
757
743
MessageId finalMessageId = messageId ;
758
744
result .exceptionally (ex -> {
759
745
log .error ("Send to retry letter topic exception with topic: {}, messageId: {}" ,
760
- retryLetterProducer . getTopic (), finalMessageId , ex );
746
+ this . deadLetterPolicy . getRetryLetterTopic (), finalMessageId , ex );
761
747
Set <MessageId > messageIds = Collections .singleton (finalMessageId );
762
748
unAckedMessageTracker .remove (finalMessageId );
763
749
redeliverUnacknowledgedMessages (messageIds );
@@ -1136,7 +1122,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
1136
1122
ArrayList <CompletableFuture <Void >> closeFutures = new ArrayList <>(4 );
1137
1123
closeFutures .add (closeFuture );
1138
1124
if (retryLetterProducer != null ) {
1139
- closeFutures .add (retryLetterProducer .closeAsync ().whenComplete ((ignore , ex ) -> {
1125
+ closeFutures .add (retryLetterProducer .thenCompose ( p -> p . closeAsync () ).whenComplete ((ignore , ex ) -> {
1140
1126
if (ex != null ) {
1141
1127
log .warn ("Exception ignored in closing retryLetterProducer of consumer" , ex );
1142
1128
}
@@ -2267,6 +2253,28 @@ private void initDeadLetterProducerIfNeeded() {
2267
2253
}
2268
2254
}
2269
2255
2256
+ private void initRetryLetterProducerIfNeeded () {
2257
+ if (retryLetterProducer == null ) {
2258
+ createProducerLock .writeLock ().lock ();
2259
+ try {
2260
+ if (retryLetterProducer == null ) {
2261
+ retryLetterProducer = client
2262
+ .newProducer (Schema .AUTO_PRODUCE_BYTES (schema ))
2263
+ .topic (this .deadLetterPolicy .getRetryLetterTopic ())
2264
+ .enableBatching (false )
2265
+ .enableChunking (true )
2266
+ .blockIfQueueFull (false )
2267
+ .createAsync ();
2268
+ retryLetterProducer .thenAccept (rtlProducer -> {
2269
+ stats .setRetryLetterProducerStats (rtlProducer .getStats ());
2270
+ });
2271
+ }
2272
+ } finally {
2273
+ createProducerLock .writeLock ().unlock ();
2274
+ }
2275
+ }
2276
+ }
2277
+
2270
2278
@ Override
2271
2279
public void seek (MessageId messageId ) throws PulsarClientException {
2272
2280
try {
0 commit comments