diff --git a/application.example.yml b/application.example.yml index e8baf5130..f9579b29d 100644 --- a/application.example.yml +++ b/application.example.yml @@ -148,6 +148,10 @@ akhq: - "^.*-rekey$" skip-consumer-groups: false # Skip loading consumer group information when showing topics skip-last-record: false # Skip loading last record date information when showing topics + # Retry options for topic operations + retry: + topic-exists: # Delay between retries when checking for existence of newly created topics. This is needed as it might take the kafka broker a few seconds to create new topics. + delay: "3s" # Topic display data options (optional) topic-data: diff --git a/src/main/java/org/akhq/repositories/TopicRepository.java b/src/main/java/org/akhq/repositories/TopicRepository.java index ec3c1c398..ffe45856f 100644 --- a/src/main/java/org/akhq/repositories/TopicRepository.java +++ b/src/main/java/org/akhq/repositories/TopicRepository.java @@ -2,6 +2,7 @@ import io.micronaut.context.ApplicationContext; import io.micronaut.context.annotation.Value; +import io.micronaut.retry.annotation.Retryable; import io.micronaut.security.authentication.Authentication; import io.micronaut.security.utils.SecurityService; import org.akhq.configs.SecurityProperties; @@ -19,6 +20,7 @@ import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @Singleton public class TopicRepository extends AbstractRepository { @@ -140,6 +142,7 @@ private boolean isStream(String name) { public void create(String clusterId, String name, int partitions, short replicationFactor, List configs) throws ExecutionException, InterruptedException { kafkaWrapper.createTopics(clusterId, name, partitions, replicationFactor); + checkIfTopicExists(clusterId, name); configRepository.updateTopic(clusterId, name, configs); } @@ -147,6 +150,14 @@ public void delete(String clusterId, String name) throws ExecutionException, Int kafkaWrapper.deleteTopics(clusterId, name); } + @Retryable( + includes = { + UnknownTopicOrPartitionException.class + }, delay = "${akhq.topic.retry.topic-exists.delay:3s}") + void checkIfTopicExists(String clusterId, String name) throws ExecutionException { + kafkaWrapper.describeTopics(clusterId, Collections.singletonList(name)); + } + private Optional> getTopicFilterRegex() { List topicFilterRegex = new ArrayList<>(); diff --git a/src/test/resources/application.yml b/src/test/resources/application.yml index 56d4ccfa9..41c6fef68 100644 --- a/src/test/resources/application.yml +++ b/src/test/resources/application.yml @@ -151,5 +151,4 @@ akhq: users: - username: user2 groups: - - operator - + - operator \ No newline at end of file