Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -40,6 +41,7 @@
* {@link EnableAutoConfiguration Auto-configuration} for {@link KafkaHealthIndicator}.
*
* @author Juan Rada
* @author Gary Russell
* @since 2.0.0
*/
@Configuration
Expand All @@ -56,10 +58,14 @@ public class KafkaHealthIndicatorAutoConfiguration

private final KafkaHealthIndicatorProperties properties;

private final KafkaProperties kafkaProperties;

public KafkaHealthIndicatorAutoConfiguration(Map<String, KafkaAdmin> admins,
KafkaHealthIndicatorProperties properties) {
KafkaHealthIndicatorProperties properties,
KafkaProperties kafkaProperties) {
this.admins = admins;
this.properties = properties;
this.kafkaProperties = kafkaProperties;
}

@Bean
Expand All @@ -71,7 +77,7 @@ public HealthIndicator kafkaHealthIndicator() {
@Override
protected KafkaHealthIndicator createHealthIndicator(KafkaAdmin source) {
Duration responseTimeout = this.properties.getResponseTimeout();
return new KafkaHealthIndicator(source, responseTimeout.toMillis());
return new KafkaHealthIndicator(source, this.kafkaProperties, responseTimeout.toMillis());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,46 @@
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeClusterOptions;
import org.apache.kafka.clients.admin.DescribeClusterResult;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.ConfigResource.Type;
import org.apache.kafka.common.errors.UnsupportedVersionException;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.actuate.health.AbstractHealthIndicator;
import org.springframework.boot.actuate.health.Health.Builder;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/**
* {@link HealthIndicator} for Kafka cluster.
*
* @author Juan Rada
* @author Gary Russell
* @since 2.0.0
*/
public class KafkaHealthIndicator extends AbstractHealthIndicator {
public class KafkaHealthIndicator extends AbstractHealthIndicator implements DisposableBean {

private static final Log logger = LogFactory.getLog(KafkaHealthIndicator.class);

static final String REPLICATION_PROPERTY = "transaction.state.log.replication.factor";

private final KafkaAdmin kafkaAdmin;
private static final long CLOSE_TIMEOUT = 30L;

private final AdminClient adminClient;

private final KafkaProperties kafkaProperties;

private final DescribeClusterOptions describeOptions;

Expand All @@ -54,34 +68,50 @@ public class KafkaHealthIndicator extends AbstractHealthIndicator {
* @param kafkaAdmin the kafka admin
* @param requestTimeout the request timeout in milliseconds
*/
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, long requestTimeout) {
public KafkaHealthIndicator(KafkaAdmin kafkaAdmin, KafkaProperties kafkaProperties, long requestTimeout) {
Assert.notNull(kafkaAdmin, "KafkaAdmin must not be null");
this.kafkaAdmin = kafkaAdmin;
this.adminClient = AdminClient.create(kafkaAdmin.getConfig());
this.kafkaProperties = kafkaProperties;
this.describeOptions = new DescribeClusterOptions()
.timeoutMs((int) requestTimeout);
}

@Override
protected void doHealthCheck(Builder builder) throws Exception {
try (AdminClient adminClient = AdminClient.create(this.kafkaAdmin.getConfig())) {
DescribeClusterResult result = adminClient
.describeCluster(this.describeOptions);
String brokerId = result.controller().get().idString();
int replicationFactor = getReplicationFactor(brokerId, adminClient);
int nodes = result.nodes().get().size();
Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
builder.status(status).withDetail("clusterId", result.clusterId().get())
.withDetail("brokerId", brokerId).withDetail("nodes", nodes);
DescribeClusterResult result = this.adminClient.describeCluster(this.describeOptions);
String brokerId = result.controller().get().idString();
int replicationFactor = getReplicationFactor(brokerId, adminClient);
int nodes = result.nodes().get().size();
Status status = nodes >= replicationFactor ? Status.UP : Status.DOWN;
builder.status(status).withDetail("clusterId", result.clusterId().get())
.withDetail("brokerId", brokerId).withDetail("nodes", nodes);
}

private int getReplicationFactor(String brokerId, AdminClient adminClient) throws Exception {
if (!StringUtils.hasText(this.kafkaProperties.getProducer().getTransactionIdPrefix())) {
return 1;
}
try {
ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId);
Map<ConfigResource, Config> kafkaConfig = adminClient
.describeConfigs(Collections.singletonList(configResource)).all().get();
Config brokerConfig = kafkaConfig.get(configResource);
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
}
catch (ExecutionException e) {
if (e.getCause() instanceof UnsupportedVersionException) {
if (logger.isDebugEnabled()) {
logger.debug("Broker does not support obtaining replication factor, assuming 1");
}
return 1;
}
throw e;
}
}

private int getReplicationFactor(String brokerId, AdminClient adminClient)
throws ExecutionException, InterruptedException {
ConfigResource configResource = new ConfigResource(Type.BROKER, brokerId);
Map<ConfigResource, Config> kafkaConfig = adminClient
.describeConfigs(Collections.singletonList(configResource)).all().get();
Config brokerConfig = kafkaConfig.get(configResource);
return Integer.parseInt(brokerConfig.get(REPLICATION_PROPERTY).value());
@Override
public void destroy() throws Exception {
this.adminClient.close(CLOSE_TIMEOUT, TimeUnit.SECONDS);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.util.SocketUtils;
Expand All @@ -36,6 +37,7 @@
*
* @author Juan Rada
* @author Stephane Nicoll
* @author Gary Russell
*/
public class KafkaHealthIndicatorTests {

Expand All @@ -53,7 +55,7 @@ public void shutdownKafka() throws Exception {
@Test
public void kafkaIsUp() throws Exception {
startKafka(1);
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin,
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, new KafkaProperties(),
1000L);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.UP);
Expand All @@ -65,7 +67,7 @@ public void kafkaIsDown() {
int freePort = SocketUtils.findAvailableTcpPort();
this.kafkaAdmin = new KafkaAdmin(Collections.singletonMap(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:" + freePort));
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin,
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, new KafkaProperties(),
1L);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
Expand All @@ -75,7 +77,9 @@ public void kafkaIsDown() {
@Test
public void notEnoughNodesForReplicationFactor() throws Exception {
startKafka(2);
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin,
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.getProducer().setTransactionIdPrefix("foo-");
KafkaHealthIndicator healthIndicator = new KafkaHealthIndicator(this.kafkaAdmin, kafkaProperties,
1000L);
Health health = healthIndicator.health();
assertThat(health.getStatus()).isEqualTo(Status.DOWN);
Expand Down