Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion spring-kafka-docs/src/main/asciidoc/kafka.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,23 @@ include::{kotlin-examples}/topics/Config.kt[tag=brokerProps]
----
====

IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` `@Bean` s.
Starting with version 2.7, you can declare multiple `NewTopic` s in a single `KafkaAdmin.NewTopics` bean definition:

====
[source, java, indent=0, role="primary"]
.Java
----
include::{java-examples}/topics/Config.java[tag=newTopicsBean]
----
[source, kotlin, indent=0, role="secondary"]
.Kotlin
----
include::{kotlin-examples}/topics/Config.kt[tag=newTopicsBean]
----
====


IMPORTANT: When using Spring Boot, a `KafkaAdmin` bean is automatically registered so you only need the `NewTopic` (and/or `NewTopics`) `@Bean` s.

By default, if the broker is not available, a message is logged, but the context continues to load.
You can programmatically invoke the admin's `initialize()` method to try again later.
Expand All @@ -114,6 +130,11 @@ The context then fails to initialize.

NOTE: If the broker supports it (1.0.0 or higher), the admin increases the number of partitions if it is found that an existing topic has fewer partitions than the `NewTopic.numPartitions`.

Starting with version 2.7, the `KafkaAdmin` provides methods to create and examine topics at runtime.

* `createOrModifyTopics`
* `describeTopics`

For more advanced features, you can use the `AdminClient` directly.
The following example shows how to do so:

Expand Down
7 changes: 7 additions & 0 deletions spring-kafka-docs/src/main/asciidoc/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,10 @@ See <<replying-template>> for more information.

By default, the `StreamsBuilderFactoryBean` is now configured to not clean up local state.
See <<streams-config>> for more information.

[[x27-admin]]
==== `KafkaAdmin` Changes

New methods `createOrModifyTopics` and `describeTopics` have been added.
`KafkaAdmin.NewTopics` has been added to facilitate configuring multiple topics in a single bean.
See <<kafka-admin>> for more information.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaAdmin.NewTopics;

/**
* Snippet for Configuring Topics section.
Expand All @@ -37,7 +38,7 @@
*/
public class Config {

// tag::topicBeans[]
// tag::topicBeans[]
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
Expand Down Expand Up @@ -74,25 +75,39 @@ public NewTopic topic3() {
}
// end::topicBeans[]
// tag::brokerProps[]
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}

@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}

@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
// end::brokerProps[]
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
// end::brokerProps[]
// tag::newTopicsBean[]
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
// end::newTopicsBean[]

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,5 +71,18 @@ class Config {
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
// end::brokerProps[]
// tag::newTopicsBean[]
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
// end::newTopicsBean[]

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2020 the original author or authors.
* Copyright 2017-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -56,7 +57,8 @@
*
* @since 1.3
*/
public class KafkaAdmin extends KafkaResourceFactory implements ApplicationContextAware, SmartInitializingSingleton {
public class KafkaAdmin extends KafkaResourceFactory
implements ApplicationContextAware, SmartInitializingSingleton, KafkaAdminOperations {

/**
* The default close timeout duration as 10 seconds.
Expand Down Expand Up @@ -129,11 +131,7 @@ public void setAutoCreate(boolean autoCreate) {
this.autoCreate = autoCreate;
}


/**
* Get an unmodifiable copy of this admin's configuration.
* @return the configuration map.
*/
@Override
public Map<String, Object> getConfigurationProperties() {
Map<String, Object> configs2 = new HashMap<>(this.configs);
checkBootstrap(configs2);
Expand All @@ -158,13 +156,14 @@ public void afterSingletonsInstantiated() {
* @see #setAutoCreate(boolean)
*/
public final boolean initialize() {
Collection<NewTopic> newTopics = this.applicationContext.getBeansOfType(NewTopic.class, false, false).values();
Collection<NewTopic> newTopics = new ArrayList<>(
this.applicationContext.getBeansOfType(NewTopic.class, false, false).values());
Collection<NewTopics> wrappers = this.applicationContext.getBeansOfType(NewTopics.class, false, false).values();
wrappers.forEach(wrapper -> newTopics.addAll(wrapper.getNewTopics()));
if (newTopics.size() > 0) {
AdminClient adminClient = null;
try {
Map<String, Object> configs2 = new HashMap<>(this.configs);
checkBootstrap(configs2);
adminClient = AdminClient.create(configs2);
adminClient = createAdmin();
}
catch (Exception e) {
if (!this.initializingContext || this.fatalIfBrokerNotAvailable) {
Expand All @@ -176,7 +175,7 @@ public final boolean initialize() {
}
if (adminClient != null) {
try {
addTopicsIfNeeded(adminClient, newTopics);
addOrModifyTopicsIfNeeded(adminClient, newTopics);
return true;
}
catch (Exception e) {
Expand All @@ -197,7 +196,39 @@ public final boolean initialize() {
return false;
}

private void addTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
@Override
public void createOrModifyTopics(NewTopic... topics) {
try (AdminClient client = createAdmin()) {
addOrModifyTopicsIfNeeded(client, Arrays.asList(topics));
}
}

@Override
public Map<String, TopicDescription> describeTopics(String... topicNames) {
try (AdminClient admin = createAdmin()) {
Map<String, TopicDescription> results = new HashMap<>();
DescribeTopicsResult topics = admin.describeTopics(Arrays.asList(topicNames));
try {
results.putAll(topics.all().get(this.operationTimeout, TimeUnit.SECONDS));
return results;
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new KafkaException("Interrupted while getting topic descriptions", ie);
}
catch (TimeoutException | ExecutionException ex) {
throw new KafkaException("Failed to obtain topic descriptions", ex);
}
}
}

private AdminClient createAdmin() {
Map<String, Object> configs2 = new HashMap<>(this.configs);
checkBootstrap(configs2);
return AdminClient.create(configs2);
}

private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection<NewTopic> topics) {
if (topics.size() > 0) {
Map<String, NewTopic> topicNameToTopic = new HashMap<>();
topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t));
Expand Down Expand Up @@ -298,4 +329,29 @@ private void modifyTopics(AdminClient adminClient, Map<String, NewPartitions> to
}
}

/**
* Wrapper for a collection of {@link NewTopic} to facilitated declaring multiple
* topics as as single bean.
*
* @since 2.7
*
*/
public static class NewTopics {

final Collection<NewTopic> newTopics = new ArrayList<>();

/**
* Construct an instance with the {@link NewTopic}s.
* @param newTopics the topics.
*/
public NewTopics(NewTopic... newTopics) {
this.newTopics.addAll(Arrays.asList(newTopics));
}

Collection<NewTopic> getNewTopics() {
return this.newTopics;
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.kafka.core;

import java.util.Map;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;

/**
* Provides a number of convenience methods wrapping {@code AdminClient}.
*
* @author Gary Russell
* @since 2.7
*
*/
public interface KafkaAdminOperations {

/**
* Get an unmodifiable copy of this admin's configuration.
* @return the configuration map.
*/
Map<String, Object> getConfigurationProperties();

/**
* Create topics if they don't exist or increase the number of partitions if needed.
* @param topics the topics.
*/
void createOrModifyTopics(NewTopic... topics);

/**
* Obtain {@link TopicDescription}s for these topics.
* @param topicNames the topic names.
* @return a map of name:topicDescription.
*/
Map<String, TopicDescription> describeTopics(String... topicNames);

}
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ private Map<String, DestinationTopicHolder> correlatePairSourceAndDestinationVal
private DestinationTopic getNextDestinationTopic(List<DestinationTopic> destinationList, int index) {
return index != destinationList.size() - 1
? destinationList.get(index + 1)
: new DestinationTopic(destinationList.get(index).getDestinationName() + this.NO_OPS_SUFFIX,
destinationList.get(index), this.NO_OPS_SUFFIX, DestinationTopic.Type.NO_OPS);
: new DestinationTopic(destinationList.get(index).getDestinationName() + NO_OPS_SUFFIX,
destinationList.get(index), NO_OPS_SUFFIX, DestinationTopic.Type.NO_OPS);
}

@Override
Expand Down
Loading