@@ -295,7 +295,18 @@ public void setFailIfSendResultIsError(boolean failIfSendResultIsError) {
295295 }
296296
297297 /**
298- * Set the minumum time to wait for message sending. Default is the producer
298+ * If true, wait for the send result and throw an exception if it fails.
299+ * It will wait for the milliseconds specified in waitForSendResultTimeout for the result.
300+ * @return true to wait.
301+ * @since 2.7.14
302+ * @see #setWaitForSendResultTimeout(Duration)
303+ */
304+ protected boolean isFailIfSendResultIsError () {
305+ return this .failIfSendResultIsError ;
306+ }
307+
308+ /**
309+ * Set the minimum time to wait for message sending. Default is the producer
299310 * configuration {@code delivery.timeout.ms} plus the {@link #setTimeoutBuffer(long)}.
300311 * @param waitForSendResultTimeout the timeout.
301312 * @since 2.7
@@ -307,8 +318,9 @@ public void setWaitForSendResultTimeout(Duration waitForSendResultTimeout) {
307318 }
308319
309320 /**
310- * Set the number of milliseconds to add to the producer configuration {@code delivery.timeout.ms}
311- * property to avoid timing out before the Kafka producer. Default 5000.
321+ * Set the number of milliseconds to add to the producer configuration
322+ * {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
323+ * Default 5000.
312324 * @param buffer the buffer.
313325 * @since 2.7
314326 * @see #setWaitForSendResultTimeout(Duration)
@@ -317,6 +329,16 @@ public void setTimeoutBuffer(long buffer) {
317329 this .timeoutBuffer = buffer ;
318330 }
319331
332+ /**
333+ * The number of milliseconds to add to the producer configuration
334+ * {@code delivery.timeout.ms} property to avoid timing out before the Kafka producer.
335+ * @return the buffer.
336+ * @since 2.7.14
337+ */
338+ protected long getTimeoutBuffer () {
339+ return this .timeoutBuffer ;
340+ }
341+
320342 /**
321343 * Set to false to retain previous exception headers as well as headers for the
322344 * current exception. Default is true, which means only the current headers are
@@ -351,6 +373,15 @@ public void setExceptionHeadersCreator(ExceptionHeadersCreator headersCreator) {
351373 this .exceptionHeadersCreator = headersCreator ;
352374 }
353375
376+ /**
377+ * True if publishing should run in a transaction.
378+ * @return true for transactional.
379+ * @since 2.7.14
380+ */
381+ protected boolean isTransactional () {
382+ return this .transactional ;
383+ }
384+
354385 /**
355386 * Clear the header inclusion bit for the header name.
356387 * @param headers the headers to clear.
@@ -612,7 +643,14 @@ protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations
612643 }
613644 }
614645
615- private void verifySendResult (KafkaOperations <Object , Object > kafkaTemplate ,
646+ /**
647+ * Wait for the send future to complete.
648+ * @param kafkaTemplate the template used to send the record.
649+ * @param outRecord the record.
650+ * @param sendResult the future.
651+ * @param inRecord the original consumer record.
652+ */
653+ protected void verifySendResult (KafkaOperations <Object , Object > kafkaTemplate ,
616654 ProducerRecord <Object , Object > outRecord ,
617655 @ Nullable ListenableFuture <SendResult <Object , Object >> sendResult , ConsumerRecord <?, ?> inRecord ) {
618656
@@ -637,7 +675,14 @@ private String pubFailMessage(ProducerRecord<Object, Object> outRecord, Consumer
637675 + outRecord .topic () + " failed for: " + KafkaUtils .format (inRecord );
638676 }
639677
640- private Duration determineSendTimeout (KafkaOperations <?, ?> template ) {
678+ /**
679+ * Determine the send timeout based on the template's producer factory and
680+ * {@link #setWaitForSendResultTimeout(Duration)}.
681+ * @param template the template.
682+ * @return the timeout.
683+ * @since 2.7.14
684+ */
685+ protected Duration determineSendTimeout (KafkaOperations <?, ?> template ) {
641686 ProducerFactory <? extends Object , ? extends Object > producerFactory = template .getProducerFactory ();
642687 if (producerFactory != null ) { // NOSONAR - will only occur in mock tests
643688 Map <String , Object > props = producerFactory .getConfigurationProperties ();
0 commit comments