Skip to content

Commit

Permalink
feat(topic): ensure topic existence after creating new topic (#667)
Browse files Browse the repository at this point in the history
Ensure topic existence after creating a new topic to cater for Kafka broker delays in the topic creation process. Mitigates UnknownTopicOrPartition exceptions being thrown when we try to interact with the newly created topic afterwards.
  • Loading branch information
tobiade authored Apr 20, 2021
1 parent 94cbf11 commit b21bb8f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 2 deletions.
4 changes: 4 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ akhq:
- "^.*-rekey$"
skip-consumer-groups: false # Skip loading consumer group information when showing topics
skip-last-record: false # Skip loading last record date information when showing topics
# Retry options for topic operations
retry:
topic-exists: # Delay between retries when checking for existence of newly created topics. This is needed as it might take the kafka broker a few seconds to create new topics.
delay: "3s"

# Topic display data options (optional)
topic-data:
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/org/akhq/repositories/TopicRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.micronaut.context.ApplicationContext;
import io.micronaut.context.annotation.Value;
import io.micronaut.retry.annotation.Retryable;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
import org.akhq.configs.SecurityProperties;
Expand All @@ -19,6 +20,7 @@
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;

@Singleton
public class TopicRepository extends AbstractRepository {
Expand Down Expand Up @@ -140,13 +142,22 @@ private boolean isStream(String name) {

public void create(String clusterId, String name, int partitions, short replicationFactor, List<org.akhq.models.Config> configs) throws ExecutionException, InterruptedException {
kafkaWrapper.createTopics(clusterId, name, partitions, replicationFactor);
checkIfTopicExists(clusterId, name);
configRepository.updateTopic(clusterId, name, configs);
}

public void delete(String clusterId, String name) throws ExecutionException, InterruptedException {
kafkaWrapper.deleteTopics(clusterId, name);
}

@Retryable(
includes = {
UnknownTopicOrPartitionException.class
}, delay = "${akhq.topic.retry.topic-exists.delay:3s}")
void checkIfTopicExists(String clusterId, String name) throws ExecutionException {
kafkaWrapper.describeTopics(clusterId, Collections.singletonList(name));
}

private Optional<List<String>> getTopicFilterRegex() {

List<String> topicFilterRegex = new ArrayList<>();
Expand Down
3 changes: 1 addition & 2 deletions src/test/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,4 @@ akhq:
users:
- username: user2
groups:
- operator

- operator

0 comments on commit b21bb8f

Please sign in to comment.