diff --git a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java index 543a5ecfc5497..62e7a529fcfbd 100644 --- a/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java +++ b/plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaUtils.java @@ -53,19 +53,16 @@ public static void createTopic(String topicName, int numOfPartitions, String boo } // validates topic is created - await().atMost(3, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers)); + await().atMost(10, TimeUnit.SECONDS).until(() -> checkTopicExistence(topicName, bootstrapServers)); } public static boolean checkTopicExistence(String topicName, String bootstrapServers) { return getAdminClient(bootstrapServers, (client -> { - Map> topics = client.describeTopics(List.of(topicName)).values(); + Map> topics = client.describeTopics(List.of(topicName)).topicNameValues(); try { return topics.containsKey(topicName) && topics.get(topicName).get().name().equals(topicName); - } catch (InterruptedException e) { - LOGGER.error("error on checkTopicExistence", e); - return false; - } catch (ExecutionException e) { + } catch (InterruptedException | ExecutionException e) { LOGGER.error("error on checkTopicExistence", e); return false; } @@ -73,13 +70,12 @@ public static boolean checkTopicExistence(String topicName, String bootstrapServ } private static Rep getAdminClient(String bootstrapServer, Function function) { - AdminClient adminClient = KafkaAdminClient.create( - Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test") - ); - try { + try ( + AdminClient adminClient = KafkaAdminClient.create( + Map.of(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer, AdminClientConfig.CLIENT_ID_CONFIG, "test") + ) + ) { return function.apply(adminClient); - } finally { - adminClient.close(); } } }