From 131080cde6274fa51b8857c9b68e4ebb7ce83506 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Sun, 24 Apr 2022 23:25:58 -0500 Subject: [PATCH 1/2] Remove Kafka binder dependence on KafkaAutoConfiguration Fixes #2349 --- .../ReactorKafkaBinderConfiguration.java | 11 ++- .../ReactorKafkaBinderIntegrationTests.java | 76 +++++++++------- .../GlobalKTableBinderConfiguration.java | 9 +- .../streams/KStreamBinderConfiguration.java | 9 +- .../streams/KTableBinderConfiguration.java | 9 +- ...StreamsBinderSupportAutoConfiguration.java | 2 +- .../KafkaStreamsBinderBootstrapTest.java | 86 +++++++++++-------- .../config/KafkaBinderConfiguration.java | 8 +- .../kafka/KafkaBinderConfigurationTest.java | 29 ++----- .../bootstrap/KafkaBinderBootstrapTest.java | 60 +++++++++---- 10 files changed, 171 insertions(+), 128 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java index bee014a3a7..1c88515f5e 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/main/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderConfiguration.java @@ -18,7 +18,6 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.Binder; @@ -28,12 +27,16 @@ import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Import; +/** + * Binder configuration for ReactorKafka. + * + * @author Gary Russell + * @author Chris Bono + */ @Configuration(proxyBeanMethods = false) @ConditionalOnMissingBean(Binder.class) -@Import({ KafkaAutoConfiguration.class }) -@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class }) +@EnableConfigurationProperties({ KafkaProperties.class, KafkaExtendedBindingProperties.class }) public class ReactorKafkaBinderConfiguration { @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java index 76eb380842..9c6d90454a 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-reactive/src/test/java/org/springframework/cloud/stream/binder/reactorkafka/ReactorKafkaBinderIntegrationTests.java @@ -24,14 +24,16 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import reactor.core.publisher.Flux; import reactor.kafka.receiver.ReceiverRecord; -import org.springframework.boot.SpringApplication; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.integration.support.MessageBuilder; @@ -42,69 +44,74 @@ import org.springframework.kafka.support.converter.MessagingMessageConverter; import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringExtension; import static org.assertj.core.api.Assertions.assertThat; /** + * Integration tests for {@link ReactorKafkaBinder}. + * * @author Soby Chacko * @author Gary Russell + * @author Chris Bono */ +@ExtendWith(SpringExtension.class) +@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD) @EmbeddedKafka(topics = { "uppercased-words", "lowercased-words" }) -public class ReactorKafkaBinderIntegrationTests { - - private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker(); +class ReactorKafkaBinderIntegrationTests { - private static Consumer consumer1; + @Autowired + private EmbeddedKafkaBroker embeddedKafka; - private static Consumer consumer2; + @ParameterizedTest + @ValueSource(booleans = { false, true }) + void endToEndReactorKafkaBinder(boolean excludeKafkaAutoConfig) { - @BeforeAll - public static void setUp() { - Map consumerProps = KafkaTestUtils.consumerProps("group", "false", - embeddedKafka); + Map consumerProps = KafkaTestUtils.consumerProps("group1", "false", embeddedKafka); consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); - consumer1 = cf.createConsumer(); + Consumer consumer1 = cf.createConsumer(); embeddedKafka.consumeFromEmbeddedTopics(consumer1, "uppercased-words"); - consumer2 = cf.createConsumer("group2", null); + Consumer consumer2 = cf.createConsumer("group2", null); embeddedKafka.consumeFromEmbeddedTopics(consumer2, "lowercased-words"); - } - - @Test - void testEndtoEndReactorKafkaBinder() throws Exception { - SpringApplication app = new SpringApplication(ReactiveKafkaApplication.class); - app.setWebApplicationType(WebApplicationType.NONE); - try (ConfigurableApplicationContext context = app.run( + try (ConfigurableApplicationContext context = new SpringApplicationBuilder(ReactiveKafkaApplication.class) + .web(WebApplicationType.NONE).run( "--server.port=0", "--spring.jmx.enabled=false", "--spring.cloud.function.definition=uppercase;lowercase", "--spring.cloud.stream.function.reactive.uppercase=true", "--spring.cloud.stream.function.reactive.lowercase=true", "--spring.cloud.stream.bindings.uppercase-in-0.group=grp1", - "--spring.cloud.stream.bindings.uppercase-in-0.destination=words", + "--spring.cloud.stream.bindings.uppercase-in-0.destination=words1", "--spring.cloud.stream.bindings.uppercase-out-0.destination=uppercased-words", "--spring.cloud.stream.bindings.lowercase-in-0.group=grp2", - "--spring.cloud.stream.bindings.lowercase-in-0.destination=words1", + "--spring.cloud.stream.bindings.lowercase-in-0.destination=words2", "--spring.cloud.stream.bindings.lowercase-out-0.destination=lowercased-words", "--spring.cloud.stream.kafka.bindings.lowercase-in-0.consumer.converterBeanName=fullRR", - "--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString())) { + "--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(), + excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { Map senderProps = KafkaTestUtils.producerProps(embeddedKafka); DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory<>(senderProps); try { KafkaTemplate template = new KafkaTemplate<>(pf, true); - template.send("words", "foobar"); - template.send("words1", "BAZQUX"); - - ConsumerRecord cr = KafkaTestUtils.getSingleRecord(consumer1, "uppercased-words"); - assertThat(cr.value()).isEqualTo("FOOBAR"); - cr = KafkaTestUtils.getSingleRecord(consumer2, "lowercased-words"); - assertThat(cr.value()).isEqualTo("bazqux"); + template.send("words1", "foobar"); + template.send("words2", "BAZQUX"); + + assertThat(KafkaTestUtils.getSingleRecord(consumer1, "uppercased-words")) + .isNotNull() + .extracting(ConsumerRecord::value) + .isEqualTo("FOOBAR"); + + assertThat(KafkaTestUtils.getSingleRecord(consumer2, "lowercased-words")) + .isNotNull() + .extracting(ConsumerRecord::value) + .isEqualTo("bazqux"); } finally { pf.destroy(); @@ -112,6 +119,11 @@ void testEndtoEndReactorKafkaBinder() throws Exception { } } + private String excludeKafkaAutoConfigParam(boolean excludeKafkaAutoConfig) { + return excludeKafkaAutoConfig ? + "--spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration" : "foo=bar"; + } + @EnableAutoConfiguration public static class ReactiveKafkaApplication { diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java index 6fd2f7a2d2..f315b41167 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/GlobalKTableBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; @@ -37,13 +37,14 @@ * Configuration for GlobalKTable binder. * * @author Soby Chacko + * @author Chris Bono * @since 2.1.0 */ @Configuration -@Import({ KafkaAutoConfiguration.class, - MultiBinderPropertiesConfiguration.class, +@Import({ MultiBinderPropertiesConfiguration.class, KafkaStreamsBinderHealthIndicatorConfiguration.class, KafkaStreamsJaasConfiguration.class}) +@EnableConfigurationProperties(KafkaProperties.class) public class GlobalKTableBinderConfiguration { @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java index 2341759c70..93d4e269f4 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KStreamBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2021 the original author or authors. + * Copyright 2017-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,8 +19,8 @@ import org.springframework.beans.factory.ObjectProvider; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; @@ -36,12 +36,13 @@ * @author Marius Bogoevici * @author Gary Russell * @author Soby Chacko + * @author Chris Bono */ @Configuration -@Import({ KafkaAutoConfiguration.class, - MultiBinderPropertiesConfiguration.class, +@Import({ MultiBinderPropertiesConfiguration.class, KafkaStreamsBinderHealthIndicatorConfiguration.class, KafkaStreamsJaasConfiguration.class}) +@EnableConfigurationProperties(KafkaProperties.class) public class KStreamBinderConfiguration { @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java index ef0d5c94a7..48f8543f9b 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KTableBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,8 +22,8 @@ import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.config.BeanFactoryPostProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.kafka.provisioning.AdminClientConfigCustomizer; import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner; import org.springframework.cloud.stream.binder.kafka.streams.properties.KafkaStreamsBinderConfigurationProperties; @@ -37,13 +37,14 @@ * Configuration for KTable binder. * * @author Soby Chacko + * @author Chris Bono */ @SuppressWarnings("ALL") @Configuration -@Import({ KafkaAutoConfiguration.class, - MultiBinderPropertiesConfiguration.class, +@Import({ MultiBinderPropertiesConfiguration.class, KafkaStreamsBinderHealthIndicatorConfiguration.class, KafkaStreamsJaasConfiguration.class}) +@EnableConfigurationProperties(KafkaProperties.class) public class KTableBinderConfiguration { @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java index e522988b84..f0482902fd 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/KafkaStreamsBinderSupportAutoConfiguration.java @@ -83,7 +83,7 @@ * @author Gary Russell */ @Configuration -@EnableConfigurationProperties(KafkaStreamsExtendedBindingProperties.class) +@EnableConfigurationProperties({ KafkaProperties.class, KafkaStreamsExtendedBindingProperties.class }) @ConditionalOnBean(BindingService.class) @AutoConfigureAfter(BindingServiceConfiguration.class) public class KafkaStreamsBinderSupportAutoConfiguration { diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java index 2ffc8ab6a6..6ed8b0aaca 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/test/java/org/springframework/cloud/stream/binder/kafka/streams/bootstrap/KafkaStreamsBinderBootstrapTest.java @@ -29,6 +29,8 @@ import org.apache.kafka.streams.kstream.KTable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -44,11 +46,14 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** + * Integration tests to verify the bootstrap of a SpringBoot application using the KafkaStreams binder. + * * @author Soby Chacko * @author Eduard Domínguez + * @author Chris Bono */ @EmbeddedKafka -public class KafkaStreamsBinderBootstrapTest { +class KafkaStreamsBinderBootstrapTest { private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker(); @@ -57,54 +62,61 @@ public void before() { System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM); } - @Test - void testKStreamBinderWithCustomEnvironmentCanStart() { - ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( - SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run( - "--spring.cloud.function.definition=input1;input2;input3", - "--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id" - + "=testKStreamBinderWithCustomEnvironmentCanStart", + @ParameterizedTest + @ValueSource(booleans = { false, true }) + void kafkaStreamsBinderWithStandardConfigCanStart(boolean excludeKafkaAutoConfig) { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleKafkaStreamsApplication.class) + .web(WebApplicationType.NONE).run( + "--spring.cloud.function.definition=input1;input2;input3", + "--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id" + + "=testKafkaStreamsBinderWithStandardConfigurationCanStart", + "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id" + + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo", + "--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id" + + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar", + "--spring.cloud.stream.kafka.streams.binder.brokers=" + + embeddedKafka.getBrokersAsString(), + excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { // @checkstyle:off + } // @checkstyle:on + } + + @ParameterizedTest + @ValueSource(booleans = { false, true }) + void kafkaStreamsBinderWithCustomConfigCanStart(boolean excludeKafkaAutoConfig) { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleKafkaStreamsApplication.class) + .web(WebApplicationType.NONE).run( + "--spring.cloud.function.definition=input1;input2;input3", + "--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id" + + "=testKStreamBinderWithCustomEnvironmentCanStart", "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id" - + "=testKStreamBinderWithCustomEnvironmentCanStart-foo", + + "=testKStreamBinderWithCustomEnvironmentCanStart-foo", "--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id" - + "=testKStreamBinderWithCustomEnvironmentCanStart-foobar", - "--spring.cloud.stream.bindings.input1-in-0.destination=foo", - "--spring.cloud.stream.bindings.input1-in-0.binder=kstreamBinder", - "--spring.cloud.stream.binders.kstreamBinder.type=kstream", - "--spring.cloud.stream.binders.kstreamBinder.environment" - + ".spring.cloud.stream.kafka.streams.binder.brokers" - + "=" + embeddedKafka.getBrokersAsString(), + + "=testKStreamBinderWithCustomEnvironmentCanStart-foobar", + "--spring.cloud.stream.bindings.input1-in-0.destination=foo", + "--spring.cloud.stream.bindings.input1-in-0.binder=kstreamBinder", + "--spring.cloud.stream.binders.kstreamBinder.type=kstream", + "--spring.cloud.stream.binders.kstreamBinder.environment" + + ".spring.cloud.stream.kafka.streams.binder.brokers" + + "=" + embeddedKafka.getBrokersAsString(), "--spring.cloud.stream.bindings.input2-in-0.destination=bar", "--spring.cloud.stream.bindings.input2-in-0.binder=ktableBinder", "--spring.cloud.stream.binders.ktableBinder.type=ktable", "--spring.cloud.stream.binders.ktableBinder.environment" - + ".spring.cloud.stream.kafka.streams.binder.brokers" - + "=" + embeddedKafka.getBrokersAsString(), + + ".spring.cloud.stream.kafka.streams.binder.brokers" + + "=" + embeddedKafka.getBrokersAsString(), "--spring.cloud.stream.bindings.input3-in-0.destination=foobar", "--spring.cloud.stream.bindings.input3-in-0.binder=globalktableBinder", "--spring.cloud.stream.binders.globalktableBinder.type=globalktable", "--spring.cloud.stream.binders.globalktableBinder.environment" - + ".spring.cloud.stream.kafka.streams.binder.brokers" - + "=" + embeddedKafka.getBrokersAsString()); - - applicationContext.close(); + + ".spring.cloud.stream.kafka.streams.binder.brokers" + + "=" + embeddedKafka.getBrokersAsString(), + excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { // @checkstyle:off + } // @checkstyle:on } - @Test - void testKafkaStreamsBinderWithStandardConfigurationCanStart() { - ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( - SimpleKafkaStreamsApplication.class).web(WebApplicationType.NONE).run( - "--spring.cloud.function.definition=input1;input2;input3", - "--spring.cloud.stream.kafka.streams.bindings.input1-in-0.consumer.application-id" - + "=testKafkaStreamsBinderWithStandardConfigurationCanStart", - "--spring.cloud.stream.kafka.streams.bindings.input2-in-0.consumer.application-id" - + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foo", - "--spring.cloud.stream.kafka.streams.bindings.input3-in-0.consumer.application-id" - + "=testKafkaStreamsBinderWithStandardConfigurationCanStart-foobar", - "--spring.cloud.stream.kafka.streams.binder.brokers=" - + embeddedKafka.getBrokersAsString()); - - applicationContext.close(); + private String excludeKafkaAutoConfigParam(boolean excludeKafkaAutoConfig) { + return excludeKafkaAutoConfig ? + "--spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration" : "foo=bar"; } @Test diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java index c32de11341..7b31b806fb 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/config/KafkaBinderConfiguration.java @@ -1,5 +1,5 @@ /* - * Copyright 2015-2019 the original author or authors. + * Copyright 2015-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,6 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.cloud.stream.binder.Binder; @@ -78,11 +77,12 @@ * @author Oleg Zhurakousky * @author Artem Bilan * @author Aldo Sinanaj + * @author Chris Bono */ @Configuration(proxyBeanMethods = false) @ConditionalOnMissingBean(Binder.class) -@Import({ KafkaAutoConfiguration.class, KafkaBinderHealthIndicatorConfiguration.class }) -@EnableConfigurationProperties({ KafkaExtendedBindingProperties.class }) +@Import({ KafkaBinderHealthIndicatorConfiguration.class }) +@EnableConfigurationProperties({ KafkaProperties.class, KafkaExtendedBindingProperties.class }) public class KafkaBinderConfiguration { @Bean diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationTest.java index 5a0b8bc6b6..70f9ea8e8f 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderConfigurationTest.java @@ -16,42 +16,25 @@ package org.springframework.cloud.stream.binder.kafka; -import java.lang.reflect.Field; - import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration; -import org.springframework.kafka.support.ProducerListener; -import org.springframework.test.context.junit.jupiter.SpringExtension; -import org.springframework.util.ReflectionUtils; +import org.springframework.test.util.ReflectionTestUtils; import static org.assertj.core.api.Assertions.assertThat; /** * @author Ilayaperumal Gopinathan + * @author Chris Bono */ -@ExtendWith(SpringExtension.class) -@SpringBootTest(classes = { KafkaBinderConfiguration.class, KafkaAutoConfiguration.class, - KafkaBinderConfigurationTest.class }) -public class KafkaBinderConfigurationTest { - - @Autowired - private KafkaMessageChannelBinder kafkaMessageChannelBinder; +@SpringBootTest(classes = { KafkaBinderConfiguration.class }) +class KafkaBinderConfigurationTest { @Test - void testKafkaBinderProducerListener() { - assertThat(this.kafkaMessageChannelBinder).isNotNull(); - Field producerListenerField = ReflectionUtils.findField( - KafkaMessageChannelBinder.class, "producerListener", - ProducerListener.class); - ReflectionUtils.makeAccessible(producerListenerField); - ProducerListener producerListener = (ProducerListener) ReflectionUtils - .getField(producerListenerField, this.kafkaMessageChannelBinder); - assertThat(producerListener).isNotNull(); + void binderAutoConfiguredWithProducerListener(@Autowired KafkaMessageChannelBinder kafkaMessageChannelBinder) { + assertThat(ReflectionTestUtils.getField(kafkaMessageChannelBinder, "producerListener")).isNotNull(); } } diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java index 2be13dbcb1..e661b86245 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java @@ -16,37 +16,67 @@ package org.springframework.cloud.stream.binder.kafka.bootstrap; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.kafka.test.EmbeddedKafkaBroker; -import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.test.context.junit.jupiter.SpringExtension; /** + * Integration tests to verify the bootstrap of a SpringBoot application using the Kafka binder. + * * @author Marius Bogoevici + * @author Chris Bono */ +@ExtendWith(SpringExtension.class) @EmbeddedKafka(count = 1, controlledShutdown = true) -public class KafkaBinderBootstrapTest { +class KafkaBinderBootstrapTest { + + @Autowired + private EmbeddedKafkaBroker embeddedKafka; + + @ParameterizedTest + @ValueSource(booleans = { false, true }) + void kafkaBinderWithStandardConfigCanStart(boolean excludeKafkaAutoConfig) { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleApplication.class) + .web(WebApplicationType.NONE).run( + "--spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(), + excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { // @checkstyle:off + } // @checkstyle:on + + } - private static EmbeddedKafkaBroker embeddedKafka; + @ParameterizedTest + @ValueSource(booleans = { false, true }) + void kafkaBinderWithCustomConfigCanStart(boolean excludeKafkaAutoConfig) { + try (ConfigurableApplicationContext context = new SpringApplicationBuilder(SimpleApplication.class) + .web(WebApplicationType.NONE).run( + "--spring.cloud.stream.bindings.uppercase-in-0.destination=inputTopic", + "--spring.cloud.stream.bindings.uppercase-in-0.group=inputGroup", + "--spring.cloud.stream.bindings.uppercase-in-0.binder=kafka1", + "--spring.cloud.stream.bindings.uppercase-out-0.destination=outputTopic", + "--spring.cloud.stream.bindings.uppercase-out-0.binder=kafka2", + "--spring.cloud.stream.binders.kafka1.type=kafka", + "--spring.cloud.stream.binders.kafka2.type=kafka", + "--spring.cloud.stream.binders.kafka1.environment" + + ".spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(), + "--spring.cloud.stream.binders.kafka2.environment" + + ".spring.cloud.stream.kafka.binder.brokers=" + embeddedKafka.getBrokersAsString(), + excludeKafkaAutoConfigParam(excludeKafkaAutoConfig))) { // @checkstyle:off + } // @checkstyle:on - @BeforeAll - public static void setup() { - embeddedKafka = EmbeddedKafkaCondition.getBroker(); } - @Test - void testKafkaBinderConfiguration() throws Exception { - ConfigurableApplicationContext applicationContext = new SpringApplicationBuilder( - SimpleApplication.class).web(WebApplicationType.NONE).run( - "--spring.cloud.stream.kafka.binder.brokers=" - + embeddedKafka.getBrokersAsString()); - applicationContext.close(); + private String excludeKafkaAutoConfigParam(boolean excludeKafkaAutoConfig) { + return excludeKafkaAutoConfig ? + "--spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration" : "a=a"; } @SpringBootApplication From fb3f7d119f1eb0f05a5d8a692d9ffac05a3caa22 Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Mon, 25 Apr 2022 13:39:30 -0500 Subject: [PATCH 2/2] Use single instance of EmbeddedKafka for KafkaBinderBootstrapTest --- .../kafka/bootstrap/KafkaBinderBootstrapTest.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java index e661b86245..7433f4b955 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/bootstrap/KafkaBinderBootstrapTest.java @@ -16,18 +16,16 @@ package org.springframework.cloud.stream.binder.kafka.bootstrap; -import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.condition.EmbeddedKafkaCondition; import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.test.context.junit.jupiter.SpringExtension; /** * Integration tests to verify the bootstrap of a SpringBoot application using the Kafka binder. @@ -35,12 +33,10 @@ * @author Marius Bogoevici * @author Chris Bono */ -@ExtendWith(SpringExtension.class) -@EmbeddedKafka(count = 1, controlledShutdown = true) +@EmbeddedKafka(controlledShutdown = true) class KafkaBinderBootstrapTest { - @Autowired - private EmbeddedKafkaBroker embeddedKafka; + private static final EmbeddedKafkaBroker embeddedKafka = EmbeddedKafkaCondition.getBroker(); @ParameterizedTest @ValueSource(booleans = { false, true })