From 0d0f64373c9e422d9bb0e70eef9db08cc7256f7f Mon Sep 17 00:00:00 2001 From: Juan Camilo Rada Date: Fri, 5 Jan 2018 10:51:03 +0000 Subject: [PATCH 1/6] GH-11435: Add Kafka health indicator - added kafka health indicator - added kafka health inidicator auto-configuration --- .../pom.xml | 5 ++ ...KafkaHealthIndicatorAutoConfiguration.java | 81 +++++++++++++++++ .../kafka/KafkaHealthIndicatorProperties.java | 45 ++++++++++ .../autoconfigure/kafka/package-info.java | 20 +++++ ...itional-spring-configuration-metadata.json | 6 ++ .../main/resources/META-INF/spring.factories | 1 + ...HealthIndicatorAutoConfigurationTests.java | 71 +++++++++++++++ .../spring-boot-actuator/pom.xml | 5 ++ .../actuate/kafka/KafkaHealthIndicator.java | 57 ++++++++++++ .../boot/actuate/kafka/package-info.java | 20 +++++ .../kafka/KafkaHealthIndicatorTest.java | 88 +++++++++++++++++++ .../kafka/KafkaAutoConfiguration.java | 8 ++ 12 files changed, 407 insertions(+) create mode 100644 spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java create mode 100644 spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java create mode 100644 spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java create mode 100644 spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java create mode 100644 spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java create mode 100644 spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java create mode 100644 spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTest.java diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml b/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml index 684b5f2307ff..817205c3b985 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml @@ -193,6 +193,11 @@ elasticsearch true + + org.springframework.kafka + spring-kafka + true + org.flywaydb flyway-core diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java new file mode 100644 index 000000000000..d72756c4caae --- /dev/null +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java @@ -0,0 +1,81 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.autoconfigure.kafka; + +import java.time.Duration; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClient; + +import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthIndicatorConfiguration; +import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; +import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.boot.actuate.kafka.KafkaHealthIndicator; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}. + * + * @author Juan Rada + */ +@Configuration +@ConditionalOnEnabledHealthIndicator("kafka") +@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class) +@AutoConfigureAfter(KafkaAutoConfiguration.class) +public class KafkaHealthIndicatorAutoConfiguration { + + @Configuration + @ConditionalOnBean(AdminClient.class) + @EnableConfigurationProperties(KafkaHealthIndicatorProperties.class) + static class KafkaClientHealthIndicatorConfiguration extends + CompositeHealthIndicatorConfiguration { + + private final Map admins; + + private final KafkaHealthIndicatorProperties properties; + + KafkaClientHealthIndicatorConfiguration(Map admins, + KafkaHealthIndicatorProperties properties) { + this.admins = admins; + this.properties = properties; + } + + @Bean + @ConditionalOnMissingBean(name = "kafkaHealthIndicator") + public HealthIndicator kafkaHealthIndicator() { + return createHealthIndicator(this.admins); + } + + @Override + protected KafkaHealthIndicator createHealthIndicator(AdminClient source) { + Duration responseTimeout = this.properties.getResponseTimeout(); + + return new KafkaHealthIndicator(source, + responseTimeout == null ? 100L : responseTimeout.toMillis()); + } + } + +} diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java new file mode 100644 index 000000000000..52994e3740fb --- /dev/null +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java @@ -0,0 +1,45 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.autoconfigure.kafka; + +import java.time.Duration; + +import org.springframework.boot.actuate.kafka.KafkaHealthIndicator; +import org.springframework.boot.context.properties.ConfigurationProperties; + +/** + * External configuration properties for {@link KafkaHealthIndicator}. + * + * @author Juan Rada + */ +@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false) +public class KafkaHealthIndicatorProperties { + + /** + * Time to wait for a response from the cluster description operation. + */ + private Duration responseTimeout = Duration.ofMillis(100); + + public Duration getResponseTimeout() { + return this.responseTimeout; + } + + public void setResponseTimeout(Duration responseTimeout) { + this.responseTimeout = responseTimeout; + } + +} diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java new file mode 100644 index 000000000000..8af3ce7a0010 --- /dev/null +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Auto-configuration for actuator JDBC concerns. + */ +package org.springframework.boot.actuate.autoconfigure.kafka; diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json index d82d6af79b13..893eab44503c 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -157,6 +157,12 @@ "description": "Whether to enable Neo4j health check.", "defaultValue": true }, + { + "name": "management.health.kafka.enabled", + "type": "java.lang.Boolean", + "description": "Whether to enable kafka health check.", + "defaultValue": true + }, { "name": "management.info.build.enabled", "type": "java.lang.Boolean", diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories index d2516abebdcd..6d31af900585 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories @@ -12,6 +12,7 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticsearchHealthIndicatorAutoConfiguration,\ +org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\ org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\ diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java new file mode 100644 index 000000000000..187287baa4ae --- /dev/null +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java @@ -0,0 +1,71 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.autoconfigure.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.junit.Test; + +import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration; +import org.springframework.boot.actuate.health.ApplicationHealthIndicator; +import org.springframework.boot.actuate.kafka.KafkaHealthIndicator; +import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.AutoConfigureBefore; +import org.springframework.boot.test.context.runner.ApplicationContextRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; + +/** + * Tests for {@link KafkaHealthIndicatorAutoConfiguration}. + * + * @author Juan Rada + */ +public class KafkaHealthIndicatorAutoConfigurationTests { + + private ApplicationContextRunner contextRunner = new ApplicationContextRunner() + .withConfiguration(AutoConfigurations.of(KafkaConfiguration.class, + KafkaHealthIndicatorAutoConfiguration.class, + HealthIndicatorAutoConfiguration.class)); + + @Test + public void runShouldCreateIndicator() { + this.contextRunner.run((context) -> assertThat(context) + .hasSingleBean(KafkaHealthIndicator.class) + .doesNotHaveBean(ApplicationHealthIndicator.class)); + } + + @Test + public void runWhenDisabledShouldNotCreateIndicator() { + this.contextRunner.withPropertyValues("management.health.kafka.enabled:false") + .run((context) -> assertThat(context) + .doesNotHaveBean(KafkaHealthIndicator.class) + .hasSingleBean(ApplicationHealthIndicator.class)); + } + + @Configuration + @AutoConfigureBefore(KafkaHealthIndicatorAutoConfiguration.class) + protected static class KafkaConfiguration { + + @Bean + public AdminClient kafkaAdminClient() { + return mock(AdminClient.class); + } + + } +} diff --git a/spring-boot-project/spring-boot-actuator/pom.xml b/spring-boot-project/spring-boot-actuator/pom.xml index 9d5e6a65f4ea..dc0eff6e41f2 100644 --- a/spring-boot-project/spring-boot-actuator/pom.xml +++ b/spring-boot-project/spring-boot-actuator/pom.xml @@ -161,6 +161,11 @@ spring-rabbit true + + org.springframework.kafka + spring-kafka + true + org.springframework.data spring-data-cassandra diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java new file mode 100644 index 000000000000..0f2f7017d1be --- /dev/null +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java @@ -0,0 +1,57 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeClusterResult; + +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health.Builder; +import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.util.Assert; + +/** + * {@link HealthIndicator} for Kafka cluster. + * + * @author Juan Rada + */ +public class KafkaHealthIndicator extends AbstractHealthIndicator { + + private final AdminClient adminClient; + private final DescribeClusterOptions describeOptions; + + /** + * Create a new {@link KafkaHealthIndicator} instance. + * + * @param adminClient the kafka admin client + * @param responseTimeout the describe cluster request timeout in milliseconds + */ + public KafkaHealthIndicator(AdminClient adminClient, long responseTimeout) { + Assert.notNull(adminClient, "KafkaAdmin must not be null"); + + this.adminClient = adminClient; + this.describeOptions = new DescribeClusterOptions().timeoutMs((int) responseTimeout); + } + + @Override + protected void doHealthCheck(Builder builder) throws Exception { + DescribeClusterResult result = this.adminClient.describeCluster(this.describeOptions); + builder.up().withDetail("clusterId", result.clusterId().get()); + } +} + diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java new file mode 100644 index 000000000000..a3f39ede4997 --- /dev/null +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Actuator support for AMQP and RabbitMQ. + */ +package org.springframework.boot.actuate.kafka; diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTest.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTest.java new file mode 100644 index 000000000000..a0b422e3a8ac --- /dev/null +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTest.java @@ -0,0 +1,88 @@ +/* + * Copyright 2012-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.boot.actuate.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeClusterOptions; +import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.KafkaFuture; +import org.assertj.core.data.MapEntry; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.verify; + +/** + * Test for {@link KafkaHealthIndicator} + */ +public class KafkaHealthIndicatorTest { + + private static final Long RESPONSE_TIME = 10L; + private static final String CLUSTER_ID = "abc_123"; + + @Mock + private AdminClient adminClient; + + @Mock + private DescribeClusterResult describeClusterResult; + + @Mock + private KafkaFuture clusterIdFuture; + + @Captor + private ArgumentCaptor describeOptionsCaptor; + + private KafkaHealthIndicator healthIndicator; + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + this.healthIndicator = new KafkaHealthIndicator(this.adminClient, RESPONSE_TIME); + given(this.describeClusterResult.clusterId()).willReturn(this.clusterIdFuture); + } + + @Test + public void kafkaIsUp() throws Exception { + given(this.adminClient.describeCluster(any(DescribeClusterOptions.class))) + .willReturn(this.describeClusterResult); + given(this.clusterIdFuture.get()).willReturn(CLUSTER_ID); + Health health = this.healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.UP); + assertThat(health.getDetails()).containsOnly(MapEntry.entry("clusterId", CLUSTER_ID)); + verify(this.adminClient).describeCluster(this.describeOptionsCaptor.capture()); + assertThat(this.describeOptionsCaptor.getValue().timeoutMs()).isEqualTo(RESPONSE_TIME.intValue()); + } + + @Test + public void kafkaIsDown() { + given(this.adminClient.describeCluster(any(DescribeClusterOptions.class))) + .willThrow(new IllegalStateException("test, expected")); + Health health = this.healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.DOWN); + assertThat((String) health.getDetails().get("error")).contains("test, expected"); + } +} diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index cf6c4dd34c55..1112c31479f7 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -18,6 +18,8 @@ import java.io.IOException; +import org.apache.kafka.clients.admin.AdminClient; + import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -138,4 +140,10 @@ public KafkaAdmin kafkaAdmin() { return kafkaAdmin; } + @Bean + @ConditionalOnMissingBean(AdminClient.class) + public AdminClient adminClient() { + return AdminClient.create(this.properties.buildAdminProperties()); + } + } From e5c3c3f4f276201a64d21852b36104b11126add0 Mon Sep 17 00:00:00 2001 From: Juan Camilo Rada Date: Fri, 5 Jan 2018 11:01:03 +0000 Subject: [PATCH 2/6] GH-11435: Add Kafka health indicator - fixed typo --- .../org/springframework/boot/actuate/kafka/package-info.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java index a3f39ede4997..1ed6f13e253c 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/package-info.java @@ -15,6 +15,6 @@ */ /** - * Actuator support for AMQP and RabbitMQ. + * Actuator support for Kafka. */ package org.springframework.boot.actuate.kafka; From 2a1cb28dc4fae956ab374f954c67b0ea7afa7d5a Mon Sep 17 00:00:00 2001 From: Juan Camilo Rada Date: Fri, 5 Jan 2018 11:40:45 +0000 Subject: [PATCH 3/6] GH-11435: Add Kafka health indicator - code review changes --- .../kafka/KafkaHealthIndicatorProperties.java | 2 +- .../kafka/KafkaHealthIndicatorAutoConfigurationTests.java | 2 -- ...hIndicatorTest.java => KafkaHealthIndicatorTests.java} | 4 +++- .../boot/autoconfigure/kafka/KafkaAutoConfiguration.java | 8 -------- 4 files changed, 4 insertions(+), 12 deletions(-) rename spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/{KafkaHealthIndicatorTest.java => KafkaHealthIndicatorTests.java} (97%) diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java index 52994e3740fb..0f619d00adf7 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorProperties.java @@ -22,7 +22,7 @@ import org.springframework.boot.context.properties.ConfigurationProperties; /** - * External configuration properties for {@link KafkaHealthIndicator}. + * Configuration properties for {@link KafkaHealthIndicator}. * * @author Juan Rada */ diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java index 187287baa4ae..ab0a6da7dbe1 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java @@ -23,7 +23,6 @@ import org.springframework.boot.actuate.health.ApplicationHealthIndicator; import org.springframework.boot.actuate.kafka.KafkaHealthIndicator; import org.springframework.boot.autoconfigure.AutoConfigurations; -import org.springframework.boot.autoconfigure.AutoConfigureBefore; import org.springframework.boot.test.context.runner.ApplicationContextRunner; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -59,7 +58,6 @@ public void runWhenDisabledShouldNotCreateIndicator() { } @Configuration - @AutoConfigureBefore(KafkaHealthIndicatorAutoConfiguration.class) protected static class KafkaConfiguration { @Bean diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTest.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java similarity index 97% rename from spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTest.java rename to spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java index a0b422e3a8ac..1d537246ba12 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTest.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java @@ -38,8 +38,10 @@ /** * Test for {@link KafkaHealthIndicator} + * + * @author Juan Rada */ -public class KafkaHealthIndicatorTest { +public class KafkaHealthIndicatorTests { private static final Long RESPONSE_TIME = 10L; private static final String CLUSTER_ID = "abc_123"; diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java index 1112c31479f7..cf6c4dd34c55 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfiguration.java @@ -18,8 +18,6 @@ import java.io.IOException; -import org.apache.kafka.clients.admin.AdminClient; - import org.springframework.beans.factory.ObjectProvider; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; @@ -140,10 +138,4 @@ public KafkaAdmin kafkaAdmin() { return kafkaAdmin; } - @Bean - @ConditionalOnMissingBean(AdminClient.class) - public AdminClient adminClient() { - return AdminClient.create(this.properties.buildAdminProperties()); - } - } From c0510f7881c4a3a763a3ed36e1cc55dfea815911 Mon Sep 17 00:00:00 2001 From: Juan Camilo Rada Date: Mon, 8 Jan 2018 12:27:14 +0000 Subject: [PATCH 4/6] GH-11435: Add Kafka health indicator - refactor code to create and destroy admin client when health monitoring --- ...KafkaHealthIndicatorAutoConfiguration.java | 13 ++-- .../autoconfigure/kafka/package-info.java | 2 +- ...HealthIndicatorAutoConfigurationTests.java | 17 +---- .../spring-boot-actuator/pom.xml | 10 +++ .../actuate/kafka/KafkaHealthIndicator.java | 21 ++++--- .../kafka/KafkaHealthIndicatorTests.java | 63 +++++++------------ 6 files changed, 54 insertions(+), 72 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java index d72756c4caae..8b5ecbf14712 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfiguration.java @@ -19,8 +19,6 @@ import java.time.Duration; import java.util.Map; -import org.apache.kafka.clients.admin.AdminClient; - import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthIndicatorConfiguration; import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator; import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration; @@ -35,6 +33,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.KafkaAdmin; /** * {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}. @@ -48,16 +47,16 @@ public class KafkaHealthIndicatorAutoConfiguration { @Configuration - @ConditionalOnBean(AdminClient.class) + @ConditionalOnBean(KafkaAdmin.class) @EnableConfigurationProperties(KafkaHealthIndicatorProperties.class) static class KafkaClientHealthIndicatorConfiguration extends - CompositeHealthIndicatorConfiguration { + CompositeHealthIndicatorConfiguration { - private final Map admins; + private final Map admins; private final KafkaHealthIndicatorProperties properties; - KafkaClientHealthIndicatorConfiguration(Map admins, + KafkaClientHealthIndicatorConfiguration(Map admins, KafkaHealthIndicatorProperties properties) { this.admins = admins; this.properties = properties; @@ -70,7 +69,7 @@ public HealthIndicator kafkaHealthIndicator() { } @Override - protected KafkaHealthIndicator createHealthIndicator(AdminClient source) { + protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) { Duration responseTimeout = this.properties.getResponseTimeout(); return new KafkaHealthIndicator(source, diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java index 8af3ce7a0010..57e9377fc620 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/main/java/org/springframework/boot/actuate/autoconfigure/kafka/package-info.java @@ -15,6 +15,6 @@ */ /** - * Auto-configuration for actuator JDBC concerns. + * Auto-configuration for actuator kafka support. */ package org.springframework.boot.actuate.autoconfigure.kafka; diff --git a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java index ab0a6da7dbe1..41be4c994a19 100644 --- a/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java +++ b/spring-boot-project/spring-boot-actuator-autoconfigure/src/test/java/org/springframework/boot/actuate/autoconfigure/kafka/KafkaHealthIndicatorAutoConfigurationTests.java @@ -16,19 +16,16 @@ package org.springframework.boot.actuate.autoconfigure.kafka; -import org.apache.kafka.clients.admin.AdminClient; import org.junit.Test; import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration; import org.springframework.boot.actuate.health.ApplicationHealthIndicator; import org.springframework.boot.actuate.kafka.KafkaHealthIndicator; import org.springframework.boot.autoconfigure.AutoConfigurations; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.test.context.runner.ApplicationContextRunner; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.mock; /** * Tests for {@link KafkaHealthIndicatorAutoConfiguration}. @@ -38,7 +35,7 @@ public class KafkaHealthIndicatorAutoConfigurationTests { private ApplicationContextRunner contextRunner = new ApplicationContextRunner() - .withConfiguration(AutoConfigurations.of(KafkaConfiguration.class, + .withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class, KafkaHealthIndicatorAutoConfiguration.class, HealthIndicatorAutoConfiguration.class)); @@ -56,14 +53,4 @@ public void runWhenDisabledShouldNotCreateIndicator() { .doesNotHaveBean(KafkaHealthIndicator.class) .hasSingleBean(ApplicationHealthIndicator.class)); } - - @Configuration - protected static class KafkaConfiguration { - - @Bean - public AdminClient kafkaAdminClient() { - return mock(AdminClient.class); - } - - } } diff --git a/spring-boot-project/spring-boot-actuator/pom.xml b/spring-boot-project/spring-boot-actuator/pom.xml index dc0eff6e41f2..45927ea71586 100644 --- a/spring-boot-project/spring-boot-actuator/pom.xml +++ b/spring-boot-project/spring-boot-actuator/pom.xml @@ -246,6 +246,11 @@ spring-boot-test-support test + + org.springframework.kafka + spring-kafka-test + test + org.springframework.boot spring-boot-autoconfigure @@ -256,6 +261,11 @@ log4j-slf4j-impl test + + org.slf4j + log4j-over-slf4j + test + org.apache.logging.log4j log4j-api diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java index 0f2f7017d1be..4f8462eae707 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java @@ -23,6 +23,7 @@ import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health.Builder; import org.springframework.boot.actuate.health.HealthIndicator; +import org.springframework.kafka.core.KafkaAdmin; import org.springframework.util.Assert; /** @@ -32,26 +33,30 @@ */ public class KafkaHealthIndicator extends AbstractHealthIndicator { - private final AdminClient adminClient; + private final KafkaAdmin kafkaAdmin; private final DescribeClusterOptions describeOptions; /** * Create a new {@link KafkaHealthIndicator} instance. * - * @param adminClient the kafka admin client + * @param kafkaAdmin the kafka admin * @param responseTimeout the describe cluster request timeout in milliseconds */ - public KafkaHealthIndicator(AdminClient adminClient, long responseTimeout) { - Assert.notNull(adminClient, "KafkaAdmin must not be null"); + public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) { + Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null"); - this.adminClient = adminClient; - this.describeOptions = new DescribeClusterOptions().timeoutMs((int) responseTimeout); + this.kafkaAdmin = kafkaAdmin; + this.describeOptions = new DescribeClusterOptions() + .timeoutMs((int) responseTimeout); } @Override protected void doHealthCheck(Builder builder) throws Exception { - DescribeClusterResult result = this.adminClient.describeCluster(this.describeOptions); - builder.up().withDetail("clusterId", result.clusterId().get()); + try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) { + DescribeClusterResult result = adminClient + .describeCluster(this.describeOptions); + builder.up().withDetail("clusterId", result.clusterId().get()); + } } } diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java index 1d537246ba12..a076c1658e6d 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java @@ -16,25 +16,20 @@ package org.springframework.boot.actuate.kafka; -import org.apache.kafka.clients.admin.AdminClient; -import org.apache.kafka.clients.admin.DescribeClusterOptions; -import org.apache.kafka.clients.admin.DescribeClusterResult; -import org.apache.kafka.common.KafkaFuture; +import java.util.Collections; + +import org.apache.kafka.clients.producer.ProducerConfig; import org.assertj.core.data.MapEntry; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; -import org.mockito.ArgumentCaptor; -import org.mockito.Captor; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Status; +import org.springframework.kafka.core.KafkaAdmin; +import org.springframework.kafka.test.rule.KafkaEmbedded; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.BDDMockito.given; -import static org.mockito.Mockito.verify; /** * Test for {@link KafkaHealthIndicator} @@ -43,48 +38,34 @@ */ public class KafkaHealthIndicatorTests { - private static final Long RESPONSE_TIME = 10L; - private static final String CLUSTER_ID = "abc_123"; - - @Mock - private AdminClient adminClient; - - @Mock - private DescribeClusterResult describeClusterResult; - - @Mock - private KafkaFuture clusterIdFuture; + private static final Long RESPONSE_TIME = 1000L; - @Captor - private ArgumentCaptor describeOptionsCaptor; + @Rule + public KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true); - private KafkaHealthIndicator healthIndicator; + private KafkaAdmin kafkaAdmin; @Before public void setup() { - MockitoAnnotations.initMocks(this); - this.healthIndicator = new KafkaHealthIndicator(this.adminClient, RESPONSE_TIME); - given(this.describeClusterResult.clusterId()).willReturn(this.clusterIdFuture); + this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaEmbedded.getBrokersAsString())); } @Test - public void kafkaIsUp() throws Exception { - given(this.adminClient.describeCluster(any(DescribeClusterOptions.class))) - .willReturn(this.describeClusterResult); - given(this.clusterIdFuture.get()).willReturn(CLUSTER_ID); - Health health = this.healthIndicator.health(); + public void kafkaIsUp() { + KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.UP); - assertThat(health.getDetails()).containsOnly(MapEntry.entry("clusterId", CLUSTER_ID)); - verify(this.adminClient).describeCluster(this.describeOptionsCaptor.capture()); - assertThat(this.describeOptionsCaptor.getValue().timeoutMs()).isEqualTo(RESPONSE_TIME.intValue()); + assertThat(health.getDetails()).containsOnly(MapEntry.entry( + "clusterId", this.kafkaEmbedded.getKafkaServer(0).clusterId())); } @Test - public void kafkaIsDown() { - given(this.adminClient.describeCluster(any(DescribeClusterOptions.class))) - .willThrow(new IllegalStateException("test, expected")); - Health health = this.healthIndicator.health(); + public void kafkaIsDown() throws Exception { + this.kafkaEmbedded.destroy(); + KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.DOWN); - assertThat((String) health.getDetails().get("error")).contains("test, expected"); + assertThat((String) health.getDetails().get("error")).isNotEmpty(); } } From 5ce811269fb8cb7cad2a3f9b94040771d499947c Mon Sep 17 00:00:00 2001 From: Juan Camilo Rada Date: Thu, 1 Feb 2018 15:24:27 +0000 Subject: [PATCH 5/6] GH-11435: Add Kafka health indicator - added support for replication factor --- .../actuate/kafka/KafkaHealthIndicator.java | 34 ++++++++++- .../kafka/KafkaHealthIndicatorTests.java | 56 ++++++++++++++----- 2 files changed, 72 insertions(+), 18 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java index 4f8462eae707..9b6929ba18cb 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java @@ -16,9 +16,16 @@ package org.springframework.boot.actuate.kafka; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; + import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.Config; import org.apache.kafka.clients.admin.DescribeClusterOptions; import org.apache.kafka.clients.admin.DescribeClusterResult; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.config.ConfigResource.Type; import org.springframework.boot.actuate.health.AbstractHealthIndicator; import org.springframework.boot.actuate.health.Health.Builder; @@ -33,6 +40,8 @@ */ public class KafkaHealthIndicator extends AbstractHealthIndicator { + static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor"; + private final KafkaAdmin kafkaAdmin; private final DescribeClusterOptions describeOptions; @@ -53,10 +62,29 @@ public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) { @Override protected void doHealthCheck(Builder builder) throws Exception { try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) { - DescribeClusterResult result = adminClient - .describeCluster(this.describeOptions); - builder.up().withDetail("clusterId", result.clusterId().get()); + DescribeClusterResult result = adminClient.describeCluster(this.describeOptions); + String brokerId = result.controller().get().idString(); + int replicationFactor = getReplicationFactor(brokerId, adminClient); + int nodes = result.nodes().get().size(); + if (nodes >= replicationFactor){ + builder.up(); + } + else { + builder.down(); + } + builder.withDetail("clusterId", result.clusterId().get()); + builder.withDetail("brokerId", brokerId); + builder.withDetail("nodes", nodes); } } + + private int getReplicationFactor(String brokerId, + AdminClient adminClient) throws ExecutionException, InterruptedException { + ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId); + Map kafkaConfig = adminClient + .describeConfigs(Collections.singletonList(configResource)).all().get(); + Config brokerConfig = kafkaConfig.get(configResource); + return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value()); + } } diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java index a076c1658e6d..a9f99f9fda22 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java @@ -17,11 +17,9 @@ package org.springframework.boot.actuate.kafka; import java.util.Collections; +import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; -import org.assertj.core.data.MapEntry; -import org.junit.Before; -import org.junit.Rule; import org.junit.Test; import org.springframework.boot.actuate.health.Health; @@ -40,30 +38,58 @@ public class KafkaHealthIndicatorTests { private static final Long RESPONSE_TIME = 1000L; - @Rule - public KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true); - + private KafkaEmbedded kafkaEmbedded; private KafkaAdmin kafkaAdmin; - @Before - public void setup() { + private void startKafka(int replicationFactor) throws Exception { + this.kafkaEmbedded = new KafkaEmbedded(1, true); + this.kafkaEmbedded.brokerProperties(Collections.singletonMap( + KafkaHealthIndicator.REPLICATION_PROPERTY, + String.valueOf(replicationFactor))); + this.kafkaEmbedded.before(); this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( - ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaEmbedded.getBrokersAsString())); + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, + this.kafkaEmbedded.getBrokersAsString())); + } + + private void shutdownKafka() throws Exception { + this.kafkaEmbedded.destroy(); } @Test - public void kafkaIsUp() { - KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + public void kafkaIsUp() throws Exception { + startKafka(1); + KafkaHealthIndicator healthIndicator = + new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.UP); - assertThat(health.getDetails()).containsOnly(MapEntry.entry( - "clusterId", this.kafkaEmbedded.getKafkaServer(0).clusterId())); + assertDetails(health.getDetails()); + shutdownKafka(); + } + + private void assertDetails(Map details){ + assertThat(details).containsEntry("brokerId", "0"); + assertThat(details).containsKey("clusterId"); + assertThat(details).containsEntry("nodes", 1); + } + + @Test + public void notEnoughNodesForReplicationFactor() throws Exception { + startKafka(2); + KafkaHealthIndicator healthIndicator = + new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + Health health = healthIndicator.health(); + assertThat(health.getStatus()).isEqualTo(Status.DOWN); + assertDetails(health.getDetails()); + shutdownKafka(); } @Test public void kafkaIsDown() throws Exception { - this.kafkaEmbedded.destroy(); - KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); + this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:34987")); + KafkaHealthIndicator healthIndicator = + new KafkaHealthIndicator(this.kafkaAdmin, RESPONSE_TIME); Health health = healthIndicator.health(); assertThat(health.getStatus()).isEqualTo(Status.DOWN); assertThat((String) health.getDetails().get("error")).isNotEmpty(); From 7a98dc5185ee76582081d77c183faad40de6ce21 Mon Sep 17 00:00:00 2001 From: Juan Camilo Rada Date: Thu, 1 Feb 2018 15:32:06 +0000 Subject: [PATCH 6/6] GH-11435: Add Kafka health indicator - fixed coding style --- .../boot/actuate/kafka/KafkaHealthIndicator.java | 3 +-- .../boot/actuate/kafka/KafkaHealthIndicatorTests.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java index 9b6929ba18cb..8b75ff3cfed0 100644 --- a/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java +++ b/spring-boot-project/spring-boot-actuator/src/main/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicator.java @@ -53,7 +53,6 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator { */ public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) { Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null"); - this.kafkaAdmin = kafkaAdmin; this.describeOptions = new DescribeClusterOptions() .timeoutMs((int) responseTimeout); @@ -66,7 +65,7 @@ protected void doHealthCheck(Builder builder) throws Exception { String brokerId = result.controller().get().idString(); int replicationFactor = getReplicationFactor(brokerId, adminClient); int nodes = result.nodes().get().size(); - if (nodes >= replicationFactor){ + if (nodes >= replicationFactor) { builder.up(); } else { diff --git a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java index a9f99f9fda22..15bc0403c0a6 100644 --- a/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java +++ b/spring-boot-project/spring-boot-actuator/src/test/java/org/springframework/boot/actuate/kafka/KafkaHealthIndicatorTests.java @@ -67,7 +67,7 @@ public void kafkaIsUp() throws Exception { shutdownKafka(); } - private void assertDetails(Map details){ + private void assertDetails(Map details) { assertThat(details).containsEntry("brokerId", "0"); assertThat(details).containsKey("clusterId"); assertThat(details).containsEntry("nodes", 1);