Skip to content

Commit 0dbd942

Browse files
Jcamiloradasnicoll
authored andcommitted
Add Kafka health indicator
See gh-11515
1 parent 76a450d commit 0dbd942

File tree

11 files changed

+434
-0
lines changed

11 files changed

+434
-0
lines changed

spring-boot-project/spring-boot-actuator-autoconfigure/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@
193193
<artifactId>elasticsearch</artifactId>
194194
<optional>true</optional>
195195
</dependency>
196+
<dependency>
197+
<groupId>org.springframework.kafka</groupId>
198+
<artifactId>spring-kafka</artifactId>
199+
<optional>true</optional>
200+
</dependency>
196201
<dependency>
197202
<groupId>org.flywaydb</groupId>
198203
<artifactId>flyway-core</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.autoconfigure.kafka;
18+
19+
import java.time.Duration;
20+
import java.util.Map;
21+
22+
import org.springframework.boot.actuate.autoconfigure.health.CompositeHealthIndicatorConfiguration;
23+
import org.springframework.boot.actuate.autoconfigure.health.ConditionalOnEnabledHealthIndicator;
24+
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
25+
import org.springframework.boot.actuate.health.HealthIndicator;
26+
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
27+
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
28+
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
29+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
30+
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
31+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
32+
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
33+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
34+
import org.springframework.context.annotation.Bean;
35+
import org.springframework.context.annotation.Configuration;
36+
import org.springframework.kafka.core.KafkaAdmin;
37+
38+
/**
39+
* {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}.
40+
*
41+
* @author Juan Rada
42+
*/
43+
@Configuration
44+
@ConditionalOnEnabledHealthIndicator("kafka")
45+
@AutoConfigureBefore(HealthIndicatorAutoConfiguration.class)
46+
@AutoConfigureAfter(KafkaAutoConfiguration.class)
47+
public class KafkaHealthIndicatorAutoConfiguration {
48+
49+
@Configuration
50+
@ConditionalOnBean(KafkaAdmin.class)
51+
@EnableConfigurationProperties(KafkaHealthIndicatorProperties.class)
52+
static class KafkaClientHealthIndicatorConfiguration extends
53+
CompositeHealthIndicatorConfiguration<KafkaHealthIndicator, KafkaAdmin> {
54+
55+
private final Map<String, KafkaAdmin> admins;
56+
57+
private final KafkaHealthIndicatorProperties properties;
58+
59+
KafkaClientHealthIndicatorConfiguration(Map<String, KafkaAdmin> admins,
60+
KafkaHealthIndicatorProperties properties) {
61+
this.admins = admins;
62+
this.properties = properties;
63+
}
64+
65+
@Bean
66+
@ConditionalOnMissingBean(name = "kafkaHealthIndicator")
67+
public HealthIndicator kafkaHealthIndicator() {
68+
return createHealthIndicator(this.admins);
69+
}
70+
71+
@Override
72+
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
73+
Duration responseTimeout = this.properties.getResponseTimeout();
74+
75+
return new KafkaHealthIndicator(source,
76+
responseTimeout == null ? 100L : responseTimeout.toMillis());
77+
}
78+
}
79+
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.autoconfigure.kafka;
18+
19+
import java.time.Duration;
20+
21+
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
22+
import org.springframework.boot.context.properties.ConfigurationProperties;
23+
24+
/**
25+
* Configuration properties for {@link KafkaHealthIndicator}.
26+
*
27+
* @author Juan Rada
28+
*/
29+
@ConfigurationProperties(prefix = "management.health.kafka", ignoreUnknownFields = false)
30+
public class KafkaHealthIndicatorProperties {
31+
32+
/**
33+
* Time to wait for a response from the cluster description operation.
34+
*/
35+
private Duration responseTimeout = Duration.ofMillis(100);
36+
37+
public Duration getResponseTimeout() {
38+
return this.responseTimeout;
39+
}
40+
41+
public void setResponseTimeout(Duration responseTimeout) {
42+
this.responseTimeout = responseTimeout;
43+
}
44+
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Auto-configuration for actuator kafka support.
19+
*/
20+
package org.springframework.boot.actuate.autoconfigure.kafka;

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/additional-spring-configuration-metadata.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,12 @@
145145
"description": "Whether to enable Neo4j health check.",
146146
"defaultValue": true
147147
},
148+
{
149+
"name": "management.health.kafka.enabled",
150+
"type": "java.lang.Boolean",
151+
"description": "Whether to enable kafka health check.",
152+
"defaultValue": true
153+
},
148154
{
149155
"name": "management.info.build.enabled",
150156
"type": "java.lang.Boolean",

spring-boot-project/spring-boot-actuator-autoconfigure/src/main/resources/META-INF/spring.factories

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ org.springframework.boot.actuate.autoconfigure.context.properties.ConfigurationP
1111
org.springframework.boot.actuate.autoconfigure.context.ShutdownEndpointAutoConfiguration,\
1212
org.springframework.boot.actuate.autoconfigure.couchbase.CouchbaseHealthIndicatorAutoConfiguration,\
1313
org.springframework.boot.actuate.autoconfigure.elasticsearch.ElasticsearchHealthIndicatorAutoConfiguration,\
14+
org.springframework.boot.actuate.autoconfigure.kafka.KafkaHealthIndicatorAutoConfiguration,\
1415
org.springframework.boot.actuate.autoconfigure.endpoint.EndpointAutoConfiguration,\
1516
org.springframework.boot.actuate.autoconfigure.endpoint.jmx.JmxEndpointAutoConfiguration,\
1617
org.springframework.boot.actuate.autoconfigure.endpoint.web.WebEndpointAutoConfiguration,\
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.autoconfigure.kafka;
18+
19+
import org.junit.Test;
20+
21+
import org.springframework.boot.actuate.autoconfigure.health.HealthIndicatorAutoConfiguration;
22+
import org.springframework.boot.actuate.health.ApplicationHealthIndicator;
23+
import org.springframework.boot.actuate.kafka.KafkaHealthIndicator;
24+
import org.springframework.boot.autoconfigure.AutoConfigurations;
25+
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
26+
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/**
31+
* Tests for {@link KafkaHealthIndicatorAutoConfiguration}.
32+
*
33+
* @author Juan Rada
34+
*/
35+
public class KafkaHealthIndicatorAutoConfigurationTests {
36+
37+
private ApplicationContextRunner contextRunner = new ApplicationContextRunner()
38+
.withConfiguration(AutoConfigurations.of(KafkaAutoConfiguration.class,
39+
KafkaHealthIndicatorAutoConfiguration.class,
40+
HealthIndicatorAutoConfiguration.class));
41+
42+
@Test
43+
public void runShouldCreateIndicator() {
44+
this.contextRunner.run((context) -> assertThat(context)
45+
.hasSingleBean(KafkaHealthIndicator.class)
46+
.doesNotHaveBean(ApplicationHealthIndicator.class));
47+
}
48+
49+
@Test
50+
public void runWhenDisabledShouldNotCreateIndicator() {
51+
this.contextRunner.withPropertyValues("management.health.kafka.enabled:false")
52+
.run((context) -> assertThat(context)
53+
.doesNotHaveBean(KafkaHealthIndicator.class)
54+
.hasSingleBean(ApplicationHealthIndicator.class));
55+
}
56+
}

spring-boot-project/spring-boot-actuator/pom.xml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@
172172
<artifactId>spring-rabbit</artifactId>
173173
<optional>true</optional>
174174
</dependency>
175+
<dependency>
176+
<groupId>org.springframework.kafka</groupId>
177+
<artifactId>spring-kafka</artifactId>
178+
<optional>true</optional>
179+
</dependency>
175180
<dependency>
176181
<groupId>org.springframework.data</groupId>
177182
<artifactId>spring-data-cassandra</artifactId>
@@ -257,6 +262,11 @@
257262
<artifactId>spring-boot-test-support</artifactId>
258263
<scope>test</scope>
259264
</dependency>
265+
<dependency>
266+
<groupId>org.springframework.kafka</groupId>
267+
<artifactId>spring-kafka-test</artifactId>
268+
<scope>test</scope>
269+
</dependency>
260270
<dependency>
261271
<groupId>org.springframework.boot</groupId>
262272
<artifactId>spring-boot-autoconfigure</artifactId>
@@ -267,6 +277,11 @@
267277
<artifactId>log4j-slf4j-impl</artifactId>
268278
<scope>test</scope>
269279
</dependency>
280+
<dependency>
281+
<groupId>org.slf4j</groupId>
282+
<artifactId>log4j-over-slf4j</artifactId>
283+
<scope>test</scope>
284+
</dependency>
270285
<dependency>
271286
<groupId>org.apache.logging.log4j</groupId>
272287
<artifactId>log4j-api</artifactId>
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.actuate.kafka;
18+
19+
import java.util.Collections;
20+
import java.util.Map;
21+
import java.util.concurrent.ExecutionException;
22+
23+
import org.apache.kafka.clients.admin.AdminClient;
24+
import org.apache.kafka.clients.admin.Config;
25+
import org.apache.kafka.clients.admin.DescribeClusterOptions;
26+
import org.apache.kafka.clients.admin.DescribeClusterResult;
27+
import org.apache.kafka.common.config.ConfigResource;
28+
import org.apache.kafka.common.config.ConfigResource.Type;
29+
30+
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
31+
import org.springframework.boot.actuate.health.Health.Builder;
32+
import org.springframework.boot.actuate.health.HealthIndicator;
33+
import org.springframework.kafka.core.KafkaAdmin;
34+
import org.springframework.util.Assert;
35+
36+
/**
37+
* {@link HealthIndicator} for Kafka cluster.
38+
*
39+
* @author Juan Rada
40+
*/
41+
public class KafkaHealthIndicator extends AbstractHealthIndicator {
42+
43+
static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";
44+
45+
private final KafkaAdmin kafkaAdmin;
46+
private final DescribeClusterOptions describeOptions;
47+
48+
/**
49+
* Create a new {@link KafkaHealthIndicator} instance.
50+
*
51+
* @param kafkaAdmin the kafka admin
52+
* @param responseTimeout the describe cluster request timeout in milliseconds
53+
*/
54+
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long responseTimeout) {
55+
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
56+
this.kafkaAdmin = kafkaAdmin;
57+
this.describeOptions = new DescribeClusterOptions()
58+
.timeoutMs((int) responseTimeout);
59+
}
60+
61+
@Override
62+
protected void doHealthCheck(Builder builder) throws Exception {
63+
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
64+
DescribeClusterResult result = adminClient.describeCluster(this.describeOptions);
65+
String brokerId = result.controller().get().idString();
66+
int replicationFactor = getReplicationFactor(brokerId, adminClient);
67+
int nodes = result.nodes().get().size();
68+
if (nodes >= replicationFactor) {
69+
builder.up();
70+
}
71+
else {
72+
builder.down();
73+
}
74+
builder.withDetail("clusterId", result.clusterId().get());
75+
builder.withDetail("brokerId", brokerId);
76+
builder.withDetail("nodes", nodes);
77+
}
78+
}
79+
80+
private int getReplicationFactor(String brokerId,
81+
AdminClient adminClient) throws ExecutionException, InterruptedException {
82+
ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId);
83+
Map<ConfigResource, Config> kafkaConfig = adminClient
84+
.describeConfigs(Collections.singletonList(configResource)).all().get();
85+
Config brokerConfig = kafkaConfig.get(configResource);
86+
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
87+
}
88+
}
89+
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Copyright 2012-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
/**
18+
* Actuator support for Kafka.
19+
*/
20+
package org.springframework.boot.actuate.kafka;

0 commit comments

Comments
 (0)