-
Notifications
You must be signed in to change notification settings - Fork 41.6k
Improve CassandraHealthIndicator with more robust mechanism #23041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
a678b45
ab809e3
ca04c9e
09eaca8
0f9ee8d
6ab3e81
ccb7d89
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,10 +16,12 @@ | |
|
|
||
| package org.springframework.boot.actuate.cassandra; | ||
|
|
||
| import com.datastax.oss.driver.api.core.ConsistencyLevel; | ||
| import java.util.Collection; | ||
| import java.util.Objects; | ||
|
|
||
| import com.datastax.oss.driver.api.core.CqlSession; | ||
| import com.datastax.oss.driver.api.core.cql.Row; | ||
| import com.datastax.oss.driver.api.core.cql.SimpleStatement; | ||
| import com.datastax.oss.driver.api.core.metadata.Node; | ||
| import com.datastax.oss.driver.api.core.metadata.NodeState; | ||
|
|
||
| import org.springframework.boot.actuate.health.AbstractHealthIndicator; | ||
| import org.springframework.boot.actuate.health.Health; | ||
|
|
@@ -31,13 +33,11 @@ | |
| * Cassandra data stores. | ||
| * | ||
| * @author Alexandre Dutra | ||
| * @author Tomasz Lelek | ||
| * @since 2.4.0 | ||
| */ | ||
| public class CassandraDriverHealthIndicator extends AbstractHealthIndicator { | ||
|
|
||
| private static final SimpleStatement SELECT = SimpleStatement | ||
| .newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE); | ||
|
|
||
| private final CqlSession session; | ||
|
|
||
| /** | ||
|
|
@@ -52,11 +52,18 @@ public CassandraDriverHealthIndicator(CqlSession session) { | |
|
|
||
| @Override | ||
| protected void doHealthCheck(Health.Builder builder) throws Exception { | ||
| Row row = this.session.execute(SELECT).one(); | ||
| builder.up(); | ||
| if (row != null && !row.isNull(0)) { | ||
| builder.withDetail("version", row.getString(0)); | ||
| Collection<Node> nodes = this.session.getMetadata().getNodes().values(); | ||
| boolean atLeastOneUp = nodes.stream().map(Node::getState).anyMatch((state) -> state == NodeState.UP); | ||
| if (atLeastOneUp) { | ||
|
||
| builder.up(); | ||
| } | ||
| else { | ||
| builder.down(); | ||
| } | ||
|
|
||
| // fill details with version of the first node (if the version is not null) | ||
| nodes.stream().map(Node::getCassandraVersion).filter(Objects::nonNull).findFirst() | ||
| .ifPresent((version) -> builder.withDetail("version", version)); | ||
|
||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,8 +16,13 @@ | |
|
|
||
| package org.springframework.boot.actuate.cassandra; | ||
|
|
||
| import com.datastax.oss.driver.api.core.ConsistencyLevel; | ||
| import com.datastax.oss.driver.api.core.cql.SimpleStatement; | ||
| import java.util.Collection; | ||
| import java.util.Objects; | ||
|
|
||
| import com.datastax.oss.driver.api.core.metadata.Metadata; | ||
| import com.datastax.oss.driver.api.core.metadata.Node; | ||
| import com.datastax.oss.driver.api.core.metadata.NodeState; | ||
| import com.datastax.oss.driver.api.core.session.Session; | ||
|
|
||
| import org.springframework.boot.actuate.health.AbstractHealthIndicator; | ||
| import org.springframework.boot.actuate.health.Health; | ||
|
|
@@ -31,13 +36,11 @@ | |
| * | ||
| * @author Julien Dubois | ||
| * @author Alexandre Dutra | ||
| * @author Tomasz Lelek | ||
|
||
| * @since 2.0.0 | ||
| */ | ||
| public class CassandraHealthIndicator extends AbstractHealthIndicator { | ||
|
|
||
| private static final SimpleStatement SELECT = SimpleStatement | ||
| .newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE); | ||
|
|
||
| private CassandraOperations cassandraOperations; | ||
|
|
||
| public CassandraHealthIndicator() { | ||
|
|
@@ -56,8 +59,23 @@ public CassandraHealthIndicator(CassandraOperations cassandraOperations) { | |
|
|
||
| @Override | ||
| protected void doHealthCheck(Health.Builder builder) throws Exception { | ||
| String version = this.cassandraOperations.getCqlOperations().queryForObject(SELECT, String.class); | ||
| builder.up().withDetail("version", version); | ||
| Metadata metadata = this.cassandraOperations.getCqlOperations().execute(Session::getMetadata); | ||
| if (metadata == null) { | ||
| throw new IllegalStateException("The CqlSession metadata was null; cannot perform health check."); | ||
| } | ||
| Collection<Node> nodes = metadata.getNodes().values(); | ||
| boolean atLeastOneUp = nodes.stream().map(Node::getState).anyMatch((state) -> state == NodeState.UP); | ||
| if (atLeastOneUp) { | ||
| builder.up(); | ||
| } | ||
| else { | ||
| builder.down(); | ||
| } | ||
|
|
||
| // fill details with version of the first node (if the version is not null) | ||
| nodes.stream().map(Node::getCassandraVersion).filter(Objects::nonNull).findFirst() | ||
| .ifPresent((version) -> builder.withDetail("version", version)); | ||
|
|
||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,27 +15,32 @@ | |
| */ | ||
| package org.springframework.boot.actuate.cassandra; | ||
|
|
||
| import com.datastax.oss.driver.api.core.ConsistencyLevel; | ||
| import com.datastax.oss.driver.api.core.cql.SimpleStatement; | ||
| import java.util.Collection; | ||
| import java.util.Objects; | ||
|
|
||
| import com.datastax.oss.driver.api.core.metadata.Metadata; | ||
| import com.datastax.oss.driver.api.core.metadata.Node; | ||
| import com.datastax.oss.driver.api.core.metadata.NodeState; | ||
| import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; | ||
| import reactor.core.publisher.Mono; | ||
|
|
||
| import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator; | ||
| import org.springframework.boot.actuate.health.Health; | ||
| import org.springframework.boot.actuate.health.Health.Builder; | ||
| import org.springframework.boot.actuate.health.ReactiveHealthIndicator; | ||
| import org.springframework.data.cassandra.core.ReactiveCassandraOperations; | ||
| import org.springframework.data.cassandra.core.cql.ReactiveSessionCallback; | ||
| import org.springframework.util.Assert; | ||
|
|
||
| /** | ||
| * A {@link ReactiveHealthIndicator} for Cassandra. | ||
| * | ||
| * @author Artsiom Yudovin | ||
| * @author Tomasz Lelek | ||
|
||
| * @since 2.1.0 | ||
| */ | ||
| public class CassandraReactiveHealthIndicator extends AbstractReactiveHealthIndicator { | ||
|
|
||
| private static final SimpleStatement SELECT = SimpleStatement | ||
| .newInstance("SELECT release_version FROM system.local").setConsistencyLevel(ConsistencyLevel.LOCAL_ONE); | ||
|
|
||
| private final ReactiveCassandraOperations reactiveCassandraOperations; | ||
|
|
||
| /** | ||
|
|
@@ -50,8 +55,30 @@ public CassandraReactiveHealthIndicator(ReactiveCassandraOperations reactiveCass | |
|
|
||
| @Override | ||
| protected Mono<Health> doHealthCheck(Health.Builder builder) { | ||
| return this.reactiveCassandraOperations.getReactiveCqlOperations().queryForObject(SELECT, String.class) | ||
| .map((version) -> builder.up().withDetail("version", version).build()).single(); | ||
|
|
||
| return this.reactiveCassandraOperations.getReactiveCqlOperations().execute(extractMetadata()).single() | ||
| .map((metadata) -> buildHealth(builder, metadata)); | ||
| } | ||
|
|
||
| protected Health buildHealth(Builder builder, Metadata metadata) { | ||
| Collection<Node> nodes = metadata.getNodes().values(); | ||
| boolean atLeastOneUp = nodes.stream().map(Node::getState).anyMatch((state) -> state == NodeState.UP); | ||
| if (atLeastOneUp) { | ||
| builder.up(); | ||
| } | ||
| else { | ||
| builder.down(); | ||
| } | ||
|
|
||
| // fill details with version of the first node (if the version is not null) | ||
| nodes.stream().map(Node::getCassandraVersion).filter(Objects::nonNull).findFirst() | ||
| .ifPresent((version) -> builder.withDetail("version", version)); | ||
| return builder.build(); | ||
|
||
| } | ||
|
|
||
| protected ReactiveSessionCallback<Metadata> extractMetadata() { | ||
| return (session) -> Mono | ||
| .fromSupplier(() -> ((DefaultDriverContext) session.getContext()).getMetadataManager().getMetadata()); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,26 +16,36 @@ | |
|
|
||
| package org.springframework.boot.actuate.cassandra; | ||
|
|
||
| import java.util.HashMap; | ||
| import java.util.Map; | ||
| import java.util.UUID; | ||
| import java.util.stream.Stream; | ||
|
|
||
| import com.datastax.oss.driver.api.core.CqlSession; | ||
| import com.datastax.oss.driver.api.core.DriverTimeoutException; | ||
| import com.datastax.oss.driver.api.core.cql.ResultSet; | ||
| import com.datastax.oss.driver.api.core.cql.Row; | ||
| import com.datastax.oss.driver.api.core.cql.SimpleStatement; | ||
| import com.datastax.oss.driver.api.core.Version; | ||
| import com.datastax.oss.driver.api.core.metadata.Metadata; | ||
| import com.datastax.oss.driver.api.core.metadata.Node; | ||
| import com.datastax.oss.driver.api.core.metadata.NodeState; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.params.ParameterizedTest; | ||
| import org.junit.jupiter.params.provider.Arguments; | ||
| import org.junit.jupiter.params.provider.MethodSource; | ||
|
|
||
| import org.springframework.boot.actuate.health.Health; | ||
| import org.springframework.boot.actuate.health.Status; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
| import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; | ||
| import static org.mockito.ArgumentMatchers.any; | ||
| import static org.mockito.BDDMockito.given; | ||
| import static org.mockito.Mockito.mock; | ||
| import static org.mockito.BDDMockito.mock; | ||
| import static org.mockito.BDDMockito.when; | ||
|
|
||
| /** | ||
| * Tests for {@link CassandraDriverHealthIndicator}. | ||
| * | ||
| * @author Alexandre Dutra | ||
| * @author Tomasz Lelek | ||
| * @since 2.4.0 | ||
| */ | ||
| class CassandraDriverHealthIndicatorTests { | ||
|
|
@@ -45,30 +55,86 @@ void createWhenCqlSessionIsNullShouldThrowException() { | |
| assertThatIllegalArgumentException().isThrownBy(() -> new CassandraDriverHealthIndicator(null)); | ||
| } | ||
|
|
||
| @ParameterizedTest | ||
| @MethodSource | ||
| void reportCassandraHealthCheck(Map<UUID, Node> nodes, Status expectedStatus) { | ||
|
||
| CqlSession session = mock(CqlSession.class); | ||
| Metadata metadata = mock(Metadata.class); | ||
| when(session.getMetadata()).thenReturn(metadata); | ||
| when(metadata.getNodes()).thenReturn(nodes); | ||
|
|
||
| CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); | ||
| Health health = healthIndicator.health(); | ||
| assertThat(health.getStatus()).isEqualTo(expectedStatus); | ||
| } | ||
|
|
||
| static Stream<Arguments> reportCassandraHealthCheck() { | ||
| Node healthyNode = mock(Node.class); | ||
| when(healthyNode.getState()).thenReturn(NodeState.UP); | ||
| Node unhealthyNode = mock(Node.class); | ||
| when(unhealthyNode.getState()).thenReturn(NodeState.DOWN); | ||
| Node unknownNode = mock(Node.class); | ||
| when(unknownNode.getState()).thenReturn(NodeState.UNKNOWN); | ||
| Node forcedDownNode = mock(Node.class); | ||
| when(forcedDownNode.getState()).thenReturn(NodeState.FORCED_DOWN); | ||
| return Stream.<Arguments>builder().add(Arguments.arguments(createNodes(healthyNode), Status.UP)) | ||
| .add(Arguments.arguments(createNodes(unhealthyNode), Status.DOWN)) | ||
| .add(Arguments.arguments(createNodes(unknownNode), Status.DOWN)) | ||
| .add(Arguments.arguments(createNodes(forcedDownNode), Status.DOWN)) | ||
| .add(Arguments.arguments(createNodes(healthyNode, unhealthyNode), Status.UP)) | ||
| .add(Arguments.arguments(createNodes(healthyNode, unknownNode), Status.UP)) | ||
| .add(Arguments.arguments(createNodes(healthyNode, forcedDownNode), Status.UP)).build(); | ||
| } | ||
|
|
||
| @Test | ||
| void healthWithCassandraUp() { | ||
| void addVersionToDetailsIfReportedNotNull() { | ||
| CqlSession session = mock(CqlSession.class); | ||
| ResultSet resultSet = mock(ResultSet.class); | ||
| Row row = mock(Row.class); | ||
| given(session.execute(any(SimpleStatement.class))).willReturn(resultSet); | ||
| given(resultSet.one()).willReturn(row); | ||
| given(row.isNull(0)).willReturn(false); | ||
| given(row.getString(0)).willReturn("1.0.0"); | ||
| Metadata metadata = mock(Metadata.class); | ||
| when(session.getMetadata()).thenReturn(metadata); | ||
| Node node = mock(Node.class); | ||
| when(node.getState()).thenReturn(NodeState.UP); | ||
| when(node.getCassandraVersion()).thenReturn(Version.V4_0_0); | ||
| when(metadata.getNodes()).thenReturn(createNodes(node)); | ||
|
|
||
| CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); | ||
| Health health = healthIndicator.health(); | ||
| assertThat(health.getStatus()).isEqualTo(Status.UP); | ||
| assertThat(health.getDetails().get("version")).isEqualTo("1.0.0"); | ||
| assertThat(health.getDetails().get("version")).isEqualTo(Version.V4_0_0); | ||
| } | ||
|
|
||
| @Test | ||
| void doNotAddVersionToDetailsIfReportedNull() { | ||
| CqlSession session = mock(CqlSession.class); | ||
| Metadata metadata = mock(Metadata.class); | ||
| when(session.getMetadata()).thenReturn(metadata); | ||
| Node node = mock(Node.class); | ||
| when(node.getState()).thenReturn(NodeState.UP); | ||
| when(metadata.getNodes()).thenReturn(createNodes(node)); | ||
|
|
||
| CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); | ||
| Health health = healthIndicator.health(); | ||
| assertThat(health.getStatus()).isEqualTo(Status.UP); | ||
| assertThat(health.getDetails().get("version")).isNull(); | ||
| } | ||
|
|
||
| @Test | ||
| void healthWithCassandraDown() { | ||
| CqlSession session = mock(CqlSession.class); | ||
| given(session.execute(any(SimpleStatement.class))).willThrow(new DriverTimeoutException("Test Exception")); | ||
| given(session.getMetadata()).willThrow(new DriverTimeoutException("Test Exception")); | ||
| CassandraDriverHealthIndicator healthIndicator = new CassandraDriverHealthIndicator(session); | ||
| Health health = healthIndicator.health(); | ||
| assertThat(health.getStatus()).isEqualTo(Status.DOWN); | ||
| assertThat(health.getDetails().get("error")) | ||
| .isEqualTo(DriverTimeoutException.class.getName() + ": Test Exception"); | ||
| } | ||
|
|
||
| private static Map<UUID, Node> createNodes(Node... nodes) { | ||
| Map<UUID, Node> nodesMap = new HashMap<>(); | ||
| for (Node n : nodes) { | ||
| nodesMap.put(UUID.randomUUID(), n); | ||
| } | ||
|
|
||
| return nodesMap; | ||
| } | ||
|
|
||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is still duplicated twice. Do you think it's worth extracting it to a separate, shared component?