diff --git a/spring-kafka-docs/src/main/asciidoc/kafka.adoc b/spring-kafka-docs/src/main/asciidoc/kafka.adoc index 636e303b09..50600bcf7c 100644 --- a/spring-kafka-docs/src/main/asciidoc/kafka.adoc +++ b/spring-kafka-docs/src/main/asciidoc/kafka.adoc @@ -105,7 +105,23 @@ include::{kotlin-examples}/topics/Config.kt[tag=brokerProps] ---- ==== -IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` `@Bean` s. +Starting with version 2.7, you can declare multiple `NewTopic` s in a single `KafkaAdmin.NewTopics` bean definition: + +==== +[source, java, indent=0, role="primary"] +.Java +---- +include::{java-examples}/topics/Config.java[tag=newTopicsBean] +---- +[source, kotlin, indent=0, role="secondary"] +.Kotlin +---- +include::{kotlin-examples}/topics/Config.kt[tag=newTopicsBean] +---- +==== + + +IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` (and/or `NewTopics`) `@Bean` s. By default, if the broker is not available, a message is logged, but the context continues to load. You can programmatically invoke the admin's `initialize()` method to try again later. @@ -114,6 +130,11 @@ The context then fails to initialize. NOTE: If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the `NewTopic.numPartitions`. +Starting with version 2.7, the `KafkaAdmin` provides methods to create and examine topics at runtime. + +* `createOrModifyTopics` +* `describeTopics` + For more advanced features, you can use the `AdminClient` directly. The following example shows how to do so: diff --git a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc index 3de8235179..24d27f6802 100644 --- a/spring-kafka-docs/src/main/asciidoc/whats-new.adoc +++ b/spring-kafka-docs/src/main/asciidoc/whats-new.adoc @@ -61,3 +61,10 @@ See <> for more information. By default, the `StreamsBuilderFactoryBean` is now configured to not clean up local state. See <> for more information. + +[[x27-admin]] +==== `KafkaAdmin` Changes + +New methods `createOrModifyTopics` and `describeTopics` have been added. +`KafkaAdmin.NewTopics` has been added to facilitate configuring multiple topics in a single bean. +See <> for more information. diff --git a/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/topics/Config.java b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/topics/Config.java index 75270012cd..66f9268d60 100644 --- a/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/topics/Config.java +++ b/spring-kafka-docs/src/main/java/org/springframework/kafka/jdocs/topics/Config.java @@ -27,6 +27,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.kafka.config.TopicBuilder; import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.core.KafkaAdmin.NewTopics; /** * Snippet for Configuring Topics section. @@ -37,7 +38,7 @@ */ public class Config { - // tag::topicBeans[] + // tag::topicBeans[] @Bean public KafkaAdmin admin() { Map configs = new HashMap<>(); @@ -74,25 +75,39 @@ public NewTopic topic3() { } // end::topicBeans[] // tag::brokerProps[] - @Bean - public NewTopic topic4() { - return TopicBuilder.name("defaultBoth") - .build(); - } + @Bean + public NewTopic topic4() { + return TopicBuilder.name("defaultBoth") + .build(); + } - @Bean - public NewTopic topic5() { - return TopicBuilder.name("defaultPart") - .replicas(1) - .build(); - } + @Bean + public NewTopic topic5() { + return TopicBuilder.name("defaultPart") + .replicas(1) + .build(); + } - @Bean - public NewTopic topic6() { - return TopicBuilder.name("defaultRepl") - .partitions(3) - .build(); - } - // end::brokerProps[] + @Bean + public NewTopic topic6() { + return TopicBuilder.name("defaultRepl") + .partitions(3) + .build(); + } + // end::brokerProps[] + // tag::newTopicsBean[] + @Bean + public KafkaAdmin.NewTopics topics456() { + return new NewTopics( + TopicBuilder.name("defaultBoth") + .build(), + TopicBuilder.name("defaultPart") + .replicas(1) + .build(), + TopicBuilder.name("defaultRepl") + .partitions(3) + .build()); + } + // end::newTopicsBean[] } diff --git a/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/topics/Config.kt b/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/topics/Config.kt index 3ffe249c0f..f90df445c2 100644 --- a/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/topics/Config.kt +++ b/spring-kafka-docs/src/main/kotlin/org/springframework/kafka/kdocs/topics/Config.kt @@ -71,5 +71,18 @@ class Config { @Bean fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build() // end::brokerProps[] + // tag::newTopicsBean[] + @Bean + fun topics456() = KafkaAdmin.NewTopics( + TopicBuilder.name("defaultBoth") + .build(), + TopicBuilder.name("defaultPart") + .replicas(1) + .build(), + TopicBuilder.name("defaultRepl") + .partitions(3) + .build() + ) + // end::newTopicsBean[] } \ No newline at end of file diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index c36541acbb..c0383fdb5a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -56,7 +57,8 @@ * * @since 1.3 */ -public class KafkaAdmin extends KafkaResourceFactory implements ApplicationContextAware, SmartInitializingSingleton { +public class KafkaAdmin extends KafkaResourceFactory + implements ApplicationContextAware, SmartInitializingSingleton, KafkaAdminOperations { /** * The default close timeout duration as 10 seconds. @@ -129,11 +131,7 @@ public void setAutoCreate(boolean autoCreate) { this.autoCreate = autoCreate; } - - /** - * Get an unmodifiable copy of this admin's configuration. - * @return the configuration map. - */ + @Override public Map getConfigurationProperties() { Map configs2 = new HashMap<>(this.configs); checkBootstrap(configs2); @@ -158,13 +156,14 @@ public void afterSingletonsInstantiated() { * @see #setAutoCreate(boolean) */ public final boolean initialize() { - Collection newTopics = this.applicationContext.getBeansOfType(NewTopic.class, false, false).values(); + Collection newTopics = new ArrayList<>( + this.applicationContext.getBeansOfType(NewTopic.class, false, false).values()); + Collection wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false).values(); + wrappers.forEach(wrapper -> newTopics.addAll(wrapper.getNewTopics())); if (newTopics.size() > 0) { AdminClient adminClient = null; try { - Map configs2 = new HashMap<>(this.configs); - checkBootstrap(configs2); - adminClient = AdminClient.create(configs2); + adminClient = createAdmin(); } catch (Exception e) { if (!this.initializingContext || this.fatalIfBrokerNotAvailable) { @@ -176,7 +175,7 @@ public final boolean initialize() { } if (adminClient != null) { try { - addTopicsIfNeeded(adminClient, newTopics); + addOrModifyTopicsIfNeeded(adminClient, newTopics); return true; } catch (Exception e) { @@ -197,7 +196,39 @@ public final boolean initialize() { return false; } - private void addTopicsIfNeeded(AdminClient adminClient, Collection topics) { + @Override + public void createOrModifyTopics(NewTopic... topics) { + try (AdminClient client = createAdmin()) { + addOrModifyTopicsIfNeeded(client, Arrays.asList(topics)); + } + } + + @Override + public Map describeTopics(String... topicNames) { + try (AdminClient admin = createAdmin()) { + Map results = new HashMap<>(); + DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames)); + try { + results.putAll(topics.all().get(this.operationTimeout, TimeUnit.SECONDS)); + return results; + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new KafkaException("Interrupted while getting topic descriptions", ie); + } + catch (TimeoutException | ExecutionException ex) { + throw new KafkaException("Failed to obtain topic descriptions", ex); + } + } + } + + private AdminClient createAdmin() { + Map configs2 = new HashMap<>(this.configs); + checkBootstrap(configs2); + return AdminClient.create(configs2); + } + + private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection topics) { if (topics.size() > 0) { Map topicNameToTopic = new HashMap<>(); topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t)); @@ -298,4 +329,29 @@ private void modifyTopics(AdminClient adminClient, Map to } } + /** + * Wrapper for a collection of {@link NewTopic} to facilitated declaring multiple + * topics as as single bean. + * + * @since 2.7 + * + */ + public static class NewTopics { + + final Collection newTopics = new ArrayList<>(); + + /** + * Construct an instance with the {@link NewTopic}s. + * @param newTopics the topics. + */ + public NewTopics(NewTopic... newTopics) { + this.newTopics.addAll(Arrays.asList(newTopics)); + } + + Collection getNewTopics() { + return this.newTopics; + } + + } + } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java new file mode 100644 index 0000000000..a3a2571410 --- /dev/null +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdminOperations.java @@ -0,0 +1,52 @@ +/* + * Copyright 2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.core; + +import java.util.Map; + +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.admin.TopicDescription; + +/** + * Provides a number of convenience methods wrapping {@code AdminClient}. + * + * @author Gary Russell + * @since 2.7 + * + */ +public interface KafkaAdminOperations { + + /** + * Get an unmodifiable copy of this admin's configuration. + * @return the configuration map. + */ + Map getConfigurationProperties(); + + /** + * Create topics if they don't exist or increase the number of partitions if needed. + * @param topics the topics. + */ + void createOrModifyTopics(NewTopic... topics); + + /** + * Obtain {@link TopicDescription}s for these topics. + * @param topicNames the topic names. + * @return a map of name:topicDescription. + */ + Map describeTopics(String... topicNames); + +} diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java index c1c3a8c009..b7041b8431 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DefaultDestinationTopicResolver.java @@ -164,8 +164,8 @@ private Map correlatePairSourceAndDestinationVal private DestinationTopic getNextDestinationTopic(List destinationList, int index) { return index != destinationList.size() - 1 ? destinationList.get(index + 1) - : new DestinationTopic(destinationList.get(index).getDestinationName() + this.NO_OPS_SUFFIX, - destinationList.get(index), this.NO_OPS_SUFFIX, DestinationTopic.Type.NO_OPS); + : new DestinationTopic(destinationList.get(index).getDestinationName() + NO_OPS_SUFFIX, + destinationList.get(index), NO_OPS_SUFFIX, DestinationTopic.Type.NO_OPS); } @Override diff --git a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java index e33dc365b6..62d56fbadf 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/core/KafkaAdminTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2017-2020 the original author or authors. + * Copyright 2017-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.config.TopicBuilder; +import org.springframework.kafka.core.KafkaAdmin.NewTopics; import org.springframework.kafka.test.EmbeddedKafkaBroker; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -87,41 +88,36 @@ public void testTopicConfigs() { @Test public void testAddTopicsAndAddPartitions() throws Exception { - try (AdminClient adminClient = AdminClient.create(this.admin.getConfigurationProperties())) { - Map results = new HashMap<>(); - await().until(() -> { - DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar")); - try { - results.putAll(topics.all().get(10, TimeUnit.SECONDS)); - return true; - } - catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - return true; - } - catch (ExecutionException ex) { - if (ex.getCause() instanceof UnknownTopicOrPartitionException) { - return false; - } - throw ex; - } - }); - results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 1)); - new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", Optional.of(4)); - new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", Optional.of(3)); - this.admin.initialize(); - int n = 0; - await().until(() -> { - DescribeTopicsResult topics = adminClient.describeTopics(Arrays.asList("foo", "bar")); - results.putAll(topics.all().get()); - TopicDescription bar = results.values().stream() - .filter(tp -> tp.name().equals("bar")) - .findFirst() - .get(); - return bar.partitions().size() != 1; - }); - results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 4 : 3)); - } + Map results = this.admin.describeTopics("foo", "bar"); + results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 2 : 1)); + new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", Optional.of(4)); + new DirectFieldAccessor(this.topic2).setPropertyValue("numPartitions", Optional.of(3)); + this.admin.initialize(); + int n = 0; + await().until(() -> { + results.putAll(this.admin.describeTopics("foo", "bar")); + TopicDescription bar = results.values().stream() + .filter(tp -> tp.name().equals("bar")) + .findFirst() + .get(); + return bar.partitions().size() != 1; + }); + results.forEach((name, td) -> assertThat(td.partitions()).hasSize(name.equals("foo") ? 4 : 3)); + new DirectFieldAccessor(this.topic1).setPropertyValue("numPartitions", Optional.of(5)); + this.admin.createOrModifyTopics(this.topic1, + TopicBuilder.name("qux") + .partitions(5) + .build()); + results.clear(); + await().until(() -> { + results.putAll(this.admin.describeTopics("foo", "qux")); + TopicDescription foo = results.values().stream() + .filter(tp -> tp.name().equals("foo")) + .findFirst() + .get(); + return foo.partitions().size() == 5; + }); + results.forEach((name, td) -> assertThat(td.partitions()).hasSize(5)); } @Test @@ -251,23 +247,16 @@ public NewTopic topic3() { } @Bean - public NewTopic topic4() { - return TopicBuilder.name("optBoth") - .build(); - } - - @Bean - public NewTopic topic5() { - return TopicBuilder.name("optPart") - .replicas(1) - .build(); - } - - @Bean - public NewTopic topic6() { - return TopicBuilder.name("optRepl") - .partitions(3) - .build(); + public NewTopics topics456() { + return new NewTopics( + TopicBuilder.name("optBoth") + .build(), + TopicBuilder.name("optPart") + .replicas(1) + .build(), + TopicBuilder.name("optRepl") + .partitions(3) + .build()); } }