|
71 | 71 | import org.junit.jupiter.api.AfterAll; |
72 | 72 | import org.junit.jupiter.api.BeforeAll; |
73 | 73 | import org.junit.jupiter.api.Test; |
| 74 | +import org.junit.jupiter.params.ParameterizedTest; |
| 75 | +import org.junit.jupiter.params.provider.NullSource; |
| 76 | +import org.junit.jupiter.params.provider.ValueSource; |
74 | 77 | import org.mockito.InOrder; |
75 | 78 | import org.mockito.Mockito; |
76 | 79 |
|
|
104 | 107 | * @author Endika Gutierrez |
105 | 108 | * @author Thomas Strauß |
106 | 109 | * @author Soby Chacko |
| 110 | + * @author Gurps Bassi |
107 | 111 | */ |
108 | 112 | @EmbeddedKafka(topics = { KafkaTemplateTests.INT_KEY_TOPIC, KafkaTemplateTests.STRING_KEY_TOPIC }) |
109 | 113 | public class KafkaTemplateTests { |
@@ -163,6 +167,12 @@ void testTemplate() { |
163 | 167 |
|
164 | 168 | template.setDefaultTopic(INT_KEY_TOPIC); |
165 | 169 |
|
| 170 | + template.setConsumerFactory( |
| 171 | + new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka))); |
| 172 | + ConsumerRecords<Integer, String> initialRecords = |
| 173 | + template.receive(Collections.singleton(new TopicPartitionOffset(INT_KEY_TOPIC, 1, 1L))); |
| 174 | + assertThat(initialRecords).isEmpty(); |
| 175 | + |
166 | 176 | template.sendDefault("foo"); |
167 | 177 | assertThat(KafkaTestUtils.getSingleRecord(consumer, INT_KEY_TOPIC)).has(value("foo")); |
168 | 178 |
|
@@ -201,8 +211,7 @@ void testTemplate() { |
201 | 211 | assertThat(partitions).isNotNull(); |
202 | 212 | assertThat(partitions).hasSize(2); |
203 | 213 | assertThat(KafkaTestUtils.getPropertyValue(pf.createProducer(), "delegate")).isSameAs(wrapped.get()); |
204 | | - template.setConsumerFactory( |
205 | | - new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka))); |
| 214 | + |
206 | 215 | ConsumerRecord<Integer, String> receive = template.receive(INT_KEY_TOPIC, 1, received.offset()); |
207 | 216 | assertThat(receive).has(allOf(keyValue(2, "buz"), partition(1))) |
208 | 217 | .extracting(ConsumerRecord::offset) |
@@ -621,4 +630,22 @@ void testCompositeProducerInterceptor() { |
621 | 630 | inOrder.verify(producerInterceptor2).onAcknowledgement(any(RecordMetadata.class), Mockito.isNull()); |
622 | 631 | } |
623 | 632 |
|
| 633 | + @ParameterizedTest(name = "{0} is invalid") |
| 634 | + @NullSource |
| 635 | + @ValueSource(longs = -1) |
| 636 | + void testReceiveWhenOffsetIsInvalid(Long offset) { |
| 637 | + Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); |
| 638 | + DefaultKafkaProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps); |
| 639 | + KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf, true); |
| 640 | + |
| 641 | + template.setConsumerFactory( |
| 642 | + new DefaultKafkaConsumerFactory<>(KafkaTestUtils.consumerProps("xx", "false", embeddedKafka))); |
| 643 | + TopicPartitionOffset tpoWithNullOffset = new TopicPartitionOffset(INT_KEY_TOPIC, 1, offset); |
| 644 | + |
| 645 | + assertThatExceptionOfType(KafkaException.class) |
| 646 | + .isThrownBy(() -> template.receive(Collections.singleton(tpoWithNullOffset))) |
| 647 | + .withMessage("Offset supplied in TopicPartitionOffset is invalid: " + tpoWithNullOffset); |
| 648 | + } |
| 649 | + |
| 650 | + |
624 | 651 | } |
0 commit comments