Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ subprojects { Project subproject ->
exclude module:'groovy-all'
}
testImplementation "io.micronaut.test:micronaut-test-spock:$micronautTestVersion"
}
}
}

apply plugin: "io.micronaut.build.docs"
Expand Down
11 changes: 9 additions & 2 deletions cassandra/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@ dependencies {
annotationProcessor "io.micronaut.docs:micronaut-docs-asciidoc-config-props:$micronautDocsVersion"

implementation 'io.micronaut:micronaut-inject'
api "com.datastax.oss:java-driver-core:4.8.0"
api "com.datastax.oss:java-driver-mapper-processor:4.8.0"

api "com.datastax.oss:java-driver-core:$datastaxCassandraDriverVersion"
api "com.datastax.oss:java-driver-mapper-processor:$datastaxCassandraDriverVersion"

compileOnly "io.micronaut:micronaut-management"

testImplementation 'io.micronaut:micronaut-inject-groovy'
testImplementation("org.spockframework:spock-core:$spockVersion") {
exclude module: 'groovy-all'
}
testImplementation "io.micronaut.test:micronaut-test-spock:$micronautTestVersion"
testImplementation "org.testcontainers:spock:1.14.3"
testImplementation "org.testcontainers:cassandra:1.14.3"
testImplementation "io.micronaut:micronaut-management"
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,16 @@ public CassandraSessionFactory(PropertyResolver propertyResolver) {
*/
@EachBean(CassandraConfiguration.class)
public CqlSessionBuilder session(CassandraConfiguration configuration) {

try {
CqlSessionBuilder builder = CqlSession.builder().withConfigLoader(new DefaultDriverConfigLoader(() -> {
return CqlSession.builder().withConfigLoader(new DefaultDriverConfigLoader(() -> {
ConfigFactory.invalidateCaches();
String prefix = configuration.getName();
return ConfigFactory.parseMap(this.resolver.getProperties(CassandraConfiguration.PREFIX + "." + prefix, StringConvention.RAW)).withFallback(ConfigFactory.load().getConfig(DefaultDriverConfigLoader.DEFAULT_ROOT_PATH));
}));
return builder;
} catch (Exception e) {
LOG.error("Failed to instantiate CQL session: " + e.getMessage(), e);
LOG.error(String.format("Failed to instantiate CQL session: %s", e.getMessage()), e);
throw e;
}

}

/**
Expand Down Expand Up @@ -102,7 +99,7 @@ public void close() {
sess.close();
} catch (Exception e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error closing data source [" + sess + "]: " + e.getMessage(), e);
LOG.warn(String.format("Error closing data source [%s]: %s", sess, e.getMessage()), e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.cassandra.health;

import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.CqlSession;

import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.NodeState;
import io.micronaut.context.annotation.Requires;
import io.micronaut.health.HealthStatus;
import io.micronaut.management.endpoint.health.HealthEndpoint;
import io.micronaut.management.health.indicator.AbstractHealthIndicator;

import javax.inject.Singleton;
import java.net.InetSocketAddress;
import java.util.*;

/**
* A {@link io.micronaut.management.health.indicator.HealthIndicator} for Cassandra.
*
* @author Ilkin Ashrafli
* @since 2.2.0
*/
@Requires(property = HealthEndpoint.PREFIX + ".cassandra.enabled", notEquals = "false")
@Requires(beans = HealthEndpoint.class)
@Singleton
public class CassandraHealthIndicator extends AbstractHealthIndicator<Map<String, Object>> {

private final CqlSession cqlSession;

/**
* Default constructor.
*
* @param cqlSession The The cassandra {@link CqlSession} to query for details
*/
public CassandraHealthIndicator(final CqlSession cqlSession) {
this.cqlSession = cqlSession;
}

@Override
protected Map<String, Object> getHealthInformation() {
Map<String, Object> detail = new LinkedHashMap<>();
Map<UUID, Node> nodes = cqlSession.getMetadata().getNodes();
detail.put("session", cqlSession.isClosed() ? "CLOSED" : "OPEN");
Optional<String> opClusterName = cqlSession.getMetadata().getClusterName();
if (opClusterName.isPresent()) {
detail.put("cluster_name", opClusterName.get());
}
Optional<CqlIdentifier> opKeyspace = cqlSession.getKeyspace();
if (opKeyspace.isPresent()) {
detail.put("keyspace", opKeyspace.get());
}
detail.put("nodes_count", nodes.keySet().size());

Map<UUID, Map<String, Object>> nodesMap = new HashMap<>();
Map<NodeState, Integer> nodeStateMap = new EnumMap<>(NodeState.class);
boolean up = false;
int i = 0;
for (Map.Entry<UUID, Node> entry : nodes.entrySet()) {
UUID uuid = entry.getKey();
Node node = entry.getValue();
nodeStateMap.merge(node.getState(), 1, (a, b) -> a + b);
if (node.getState() == NodeState.UP) {
up = true;
}
if (i++ < 10) {
Map<String, Object> nodeMap = new HashMap<>();
Optional<InetSocketAddress> opBroadcastAddress = node.getBroadcastAddress();
if (opBroadcastAddress.isPresent()) {
nodeMap.put("broadcast_address", opBroadcastAddress.get().getAddress());
}
nodeMap.put("endpoint", node.getEndPoint());
nodeMap.put("state", node.getState());
nodeMap.put("distance", node.getDistance());
nodeMap.put("open_connections", node.getOpenConnections());
nodeMap.put("cassandra_version", node.getCassandraVersion());
nodeMap.put("datacenter", node.getDatacenter());
nodeMap.put("rack", node.getRack());
nodeMap.put("uptime_ms", node.getUpSinceMillis());
nodeMap.put("is_reconnecting", node.isReconnecting());
nodesMap.put(uuid, nodeMap);
}
}
detail.put("nodes_state", nodeStateMap);
if (nodesMap.size() > 0) {
detail.put("nodes (10 max.)", nodesMap);
}
healthStatus = up ? HealthStatus.UP : HealthStatus.DOWN;
return detail;
}

@Override
protected String getName() {
return "cassandra";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Classes related to performing health checks for Cassandra.
*
* @author Ilkin Ashrafli
* @since 2.2.0
*/
package io.micronaut.configuration.cassandra.health;
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright 2017-2018 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.cassandra.health

import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.CqlSessionBuilder
import io.micronaut.configuration.cassandra.CassandraConfiguration
import io.micronaut.context.ApplicationContext
import io.micronaut.context.DefaultApplicationContext
import io.micronaut.context.env.MapPropertySource
import io.micronaut.context.event.BeanCreatedEvent
import io.micronaut.context.event.BeanCreatedEventListener
import io.micronaut.context.exceptions.NoSuchBeanException
import io.micronaut.health.HealthStatus
import io.micronaut.management.health.indicator.HealthResult
import io.reactivex.Single
import org.testcontainers.containers.CassandraContainer
import spock.lang.Specification

import javax.inject.Singleton

/**
* @author Ilkin Ashrafli
* @since 2.2.0
*/
class CassandraHealthIndicatorSpec extends Specification {

void "test cassandra health indicator"() {
given:
CassandraContainer cassandraContainer = new CassandraContainer()
cassandraContainer.start()

// tag::single[]
ApplicationContext applicationContext = new DefaultApplicationContext("test")
applicationContext.environment.addPropertySource(MapPropertySource.of(
'test',
['cassandra.default.basic.contact-points' : ["localhost:$cassandraContainer.firstMappedPort"],
'cassandra.default.advanced.metadata.schema.enabled' : false,
'cassandra.default.basic.load-balancing-policy.local-datacenter': 'datacenter1']
))
applicationContext.start()
// end::single[]


expect:
!applicationContext.getBean(CqlSessionBuilderListener).invoked
applicationContext.containsBean(CassandraConfiguration)
applicationContext.containsBean(CqlSession)

when:
CassandraHealthIndicator healthIndicator = applicationContext.getBean(CassandraHealthIndicator)
applicationContext.getBean(CqlSessionBuilderListener).invoked
HealthResult result = Single.fromPublisher(healthIndicator.result).blockingGet()

then:
result.status == HealthStatus.UP
Map<String, Object> detailsMap = (Map<String, Object>) result.details
detailsMap.containsKey("nodes_count")
detailsMap.containsKey("nodes_state")
detailsMap.get("session").toString().startsWith("OPEN")

when:
cassandraContainer.stop()
result = Single.fromPublisher(healthIndicator.result).blockingGet()

then:
result.status == HealthStatus.DOWN

cleanup:
applicationContext.close()
}

void "test that CassandraHealthIndicator is not created when the endpoints.health.cassandra.enabled is set to false"() {
given:
// tag::single[]
ApplicationContext applicationContext = new DefaultApplicationContext("test")
applicationContext.environment.addPropertySource(MapPropertySource.of(
'test',
['endpoints.health.cassandra.enabled': false]
))
applicationContext.start()
// end::single[]

when:
applicationContext.getBean(CassandraHealthIndicator)


then:
thrown(NoSuchBeanException)

cleanup:
applicationContext.close()
}

@Singleton
static class CqlSessionBuilderListener implements BeanCreatedEventListener<CqlSessionBuilder> {
boolean invoked = false

@Override
CqlSessionBuilder onCreated(BeanCreatedEvent<CqlSessionBuilder> event) {
def builder = event.getBean()
invoked = builder != null
return builder
}
}
}
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ micronautDocsVersion=1.0.23
micronautBuildVersion=1.1.5
micronautVersion=1.3.5
micronautTestVersion=1.2.2
datastaxCassandraDriverVersion=4.8.0
projectDesc=Provides integration between Micronaut and Cassandra
projectUrl=http://micronaut.io
spockVersion=1.3-groovy-2.5
Expand Down
37 changes: 37 additions & 0 deletions src/main/docs/guide/health.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
When the `cassandra` module is activated a api:io.micronaut.configuration.cassandra.health.CassandraHealthIndicator[] is
activated resulting in the `/health` endpoint and https://docs.micronaut.io/latest/api/io/micronaut/health/CurrentHealthStatus.html[CurrentHealthStatus]
interface resolving the health of the Cassandra cluster.

[source,json]
----
"cassandra": {
"status": "UP",
"details": {
"session": "OPEN",
"cluster_name": "Test Cluster",
"nodes_count": 1,
"nodes_state": {
"UP": 1
}
"nodes (10 max.)": {
"4a9e6e53-7eed-4c1f-b497-4f93715b2b04": {
"endpoint": "localhost/0:0:0:0:0:0:0:1:32834"
"broadcast_address": "/172.17.0.3",
"state": "UP",
"distance": "LOCAL",
"open_connections": "2",
"cassandra_version": "3.11.2",
"datacenter": "datacenter1",
"rack": "rack1",
"uptime_ms": "1593462452206",
"is_reconnecting": "false"
}
}
}
}
----


TIP: To disable the Cassandra health indicator entirely, add `endpoints.health.cassandra.enabled: false`.

See the section on the https://docs.micronaut.io/latest/guide/index.html#healthEndpoint[Health Endpoint] for more information.
1 change: 1 addition & 0 deletions src/main/docs/guide/toc.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
introduction: Introduction
setup: Setting up Cassandra
health: Health Checks
additional: Additional Notes
repository: Repository