Skip to content

Commit

Permalink
feat: reduce pressure on db due to health and monitoring
Browse files Browse the repository at this point in the history
  • Loading branch information
jhaeyaert committed Jun 14, 2024
1 parent 6b1380c commit e10d8a8
Show file tree
Hide file tree
Showing 24 changed files with 1,205 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,40 @@
import java.util.concurrent.CompletionStage;

/**
* Represents a probe that can be evaluated and used for monitoring purpose.
*
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
public interface Probe {
/**
* The identifier of the probe (ex: 'cpu', 'memory', ...).
* @return
*/
String id();

/**
* Evaluate the probe and return a result indicating if it is healthy or not.
*
* @return a {@link CompletionStage} containing the result of the evaluation.
*/
CompletionStage<Result> check();

/**
* Indicates if the probe requests caching to avoid too many evaluations that can have an impact on performances.
* Note: the probe itself is not mandatory to implement caching. This is the responsibility of the invoker to implement the caching of the result.
*
* @return <code>true</code> if the probe requests caching, <code>false</code> otherwise. Default is <code>false</code>.
*/
default boolean isCacheable() {
return false;
}

/**
* Indicates if the probe should be evaluated and visible by default by the health check.
*
* @return <code>true</code> if the probe should be visible, <code>false</code> otherwise. Default is <code>true</code>.
*/
default boolean isVisibleByDefault() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.gravitee.node.api.healthcheck;

import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* This registry that holds all the registered probes (see {@link ProbeManager}) and allows evaluating them in a single call.
*
* @author Jeoffrey HAEYAERT (jeoffrey.haeyaert at graviteesource.com)
* @author GraviteeSource Team
*/
public interface ProbeEvaluator {
/**
* Evaluate all the registered probes.
* In case a {@link Probe} is cacheable, the probe will be checked again only if the time elapsed since the last evaluation is above a particular threshold.
*
* @return a {@link CompletableFuture} with the map of all evaluated probes.
*/
CompletableFuture<Map<Probe, Result>> evaluate();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.gravitee.node.monitoring;

import io.gravitee.node.api.healthcheck.Probe;
import io.gravitee.node.api.healthcheck.ProbeEvaluator;
import io.gravitee.node.api.healthcheck.ProbeManager;
import io.gravitee.node.api.healthcheck.Result;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import lombok.RequiredArgsConstructor;

/**
* @author Jeoffrey HAEYAERT (jeoffrey.haeyaert at graviteesource.com)
* @author GraviteeSource Team
*/
@RequiredArgsConstructor
public class DefaultProbeEvaluator implements ProbeEvaluator {

protected static final long SAFEGUARD_DELAY = 5000L;

private final long cacheDurationMs;
private final ProbeManager probeManager;
private final Map<Probe, Result> lastProbeResults = new ConcurrentHashMap<>();
private Long lastEvaluation;

@Override
public CompletableFuture<Map<Probe, Result>> evaluate() {
final long now = Instant.now().toEpochMilli();
final long elapsedTime = lastEvaluation != null ? now - lastEvaluation : Long.MAX_VALUE;

if (elapsedTime < SAFEGUARD_DELAY) {
// Avoid too much pressure evaluating probes.
return CompletableFuture.completedFuture(lastProbeResults);
}

final List<CompletableFuture<Void>> collect =
this.probeManager.getProbes()
.stream()
.map(probe -> {
if (probe.isCacheable()) {
if (elapsedTime < cacheDurationMs && lastProbeResults.containsKey(probe)) {
// The probe has been evaluated once and the elapsed time is below the cache limit. Don't re-evaluate.
return CompletableFuture.<Void>completedFuture(null);
}
}

// Evaluate the probe and update the probe map.
return probe
.check()
.exceptionally(Result::unhealthy)
.thenAccept(result -> lastProbeResults.compute(probe, (probe1, result1) -> result))
.toCompletableFuture();
})
.toList();

// Ensure all the probes have been resolved and return all the results.
return CompletableFuture
.allOf(collect.toArray(new CompletableFuture[0]))
.thenApply(unused -> lastProbeResults)
.whenComplete((probeResultMap, throwable) -> {
if (throwable == null) {
lastEvaluation = now;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,35 @@
*/
package io.gravitee.node.monitoring.healthcheck;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.gravitee.common.http.HttpHeaders;
import io.gravitee.common.http.HttpMethod;
import io.gravitee.common.http.HttpStatusCode;
import io.gravitee.common.http.MediaType;
import io.gravitee.node.api.healthcheck.Probe;
import io.gravitee.node.api.healthcheck.ProbeEvaluator;
import io.gravitee.node.api.healthcheck.Result;
import io.gravitee.node.management.http.endpoint.ManagementEndpoint;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.jackson.DatabindCodec;
import io.vertx.ext.web.RoutingContext;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
@Slf4j
@RequiredArgsConstructor
public class NodeHealthCheckManagementEndpoint implements ManagementEndpoint {

private static final Logger LOGGER = LoggerFactory.getLogger(NodeHealthCheckManagementEndpoint.class);

private NodeHealthCheckThread registry;

final ObjectMapper objectMapper;

public static final String PROBE_FILTER = "probes";

public NodeHealthCheckManagementEndpoint() {
objectMapper = DatabindCodec.prettyMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
private final ProbeEvaluator probeEvaluator;
private final ObjectMapper mapper;

@Override
public HttpMethod method() {
Expand All @@ -64,41 +57,39 @@ public String path() {

@Override
public void handle(RoutingContext ctx) {
Map<Probe, Result> probes = registry
.getResults()
.entrySet()
.stream()
.filter(entry ->
ctx.queryParams().contains(PROBE_FILTER)
? ctx.queryParams().get(PROBE_FILTER).contains(entry.getKey().id())
: entry.getKey().isVisibleByDefault()
)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

boolean healthyProbe = probes.values().stream().allMatch(Result::isHealthy);

HttpServerResponse response = ctx.response();
response.setStatusCode(healthyProbe ? HttpStatusCode.OK_200 : HttpStatusCode.INTERNAL_SERVER_ERROR_500);
response.putHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON);
response.setChunked(true);

Map<String, Result> results = probes
.entrySet()
.stream()
.collect(Collectors.toMap(probeResultEntry -> probeResultEntry.getKey().id(), Map.Entry::getValue));

try {
final ObjectMapper objectMapper = DatabindCodec.prettyMapper();
response.write(objectMapper.writeValueAsString(results));
} catch (JsonProcessingException e) {
LOGGER.warn("Unable to encode health check result into json.", e);
}

// End the response
response.end();
}

public void setRegistry(NodeHealthCheckThread registry) {
this.registry = registry;
probeEvaluator
.evaluate()
.thenAccept(probeResults -> {
final Map<Probe, Result> probes = probeResults
.entrySet()
.stream()
.filter(entry ->
ctx.queryParams().contains(PROBE_FILTER)
? ctx.queryParams().get(PROBE_FILTER).contains(entry.getKey().id())
: entry.getKey().isVisibleByDefault()
)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

boolean healthyProbe = probes.values().stream().allMatch(Result::isHealthy);

HttpServerResponse response = ctx.response();
response.setStatusCode(healthyProbe ? HttpStatusCode.OK_200 : HttpStatusCode.INTERNAL_SERVER_ERROR_500);
response.putHeader(HttpHeaderNames.CONTENT_TYPE, MediaType.APPLICATION_JSON);
response.setChunked(true);

Map<String, Result> results = probes
.entrySet()
.stream()
.collect(Collectors.toMap(probeResultEntry -> probeResultEntry.getKey().id(), Map.Entry::getValue));

try {
response.write(mapper.writeValueAsString(results));
} catch (JsonProcessingException e) {
log.warn("Unable to encode health check result into json.", e);
}

// End the response
response.end();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,82 +16,99 @@
package io.gravitee.node.monitoring.healthcheck;

import io.gravitee.common.service.AbstractService;
import io.gravitee.node.api.Node;
import io.gravitee.node.api.healthcheck.HealthCheck;
import io.gravitee.node.api.healthcheck.ProbeManager;
import io.gravitee.node.management.http.endpoint.ManagementEndpointManager;
import io.gravitee.node.monitoring.DefaultProbeEvaluator;
import io.gravitee.node.monitoring.eventbus.HealthCheckCodec;
import io.gravitee.node.monitoring.healthcheck.micrometer.NodeHealthCheckMicrometerHandler;
import io.gravitee.node.monitoring.spring.HealthConfiguration;
import io.gravitee.plugin.alert.AlertEventProducer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.DeliveryOptions;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.micrometer.backends.BackendRegistries;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

/**
* @author David BRASSELY (david.brassely at graviteesource.com)
* @author GraviteeSource Team
*/
@Slf4j
@RequiredArgsConstructor
public class NodeHealthCheckService extends AbstractService {

public static final String GIO_NODE_HEALTHCHECK_BUS = "gio:node:healthcheck";

@Autowired
private ManagementEndpointManager managementEndpointManager;

@Autowired
private ProbeManager probeManager;

@Autowired
private NodeHealthCheckManagementEndpoint healthCheckEndpoint;

@Autowired
private Vertx vertx;

private long metricsPollerId = -1;

private static final long NODE_CHECKER_DELAY = 5000;
private final ManagementEndpointManager managementEndpointManager;
private final DefaultProbeEvaluator probeRegistry;
private final NodeHealthCheckManagementEndpoint healthCheckEndpoint;
private final AlertEventProducer alertEventProducer;
private final Node node;
private final Vertx vertx;
private final HealthConfiguration healthConfiguration;

private MessageProducer<HealthCheck> producer;
private ExecutorService executorService;

@Override
protected void doStart() throws Exception {
super.doStart();

producer =
vertx
.eventBus()
.registerCodec(new HealthCheckCodec())
.sender(
GIO_NODE_HEALTHCHECK_BUS,
new DeliveryOptions().setTracingPolicy(TracingPolicy.IGNORE).setCodecName(HealthCheckCodec.CODEC_NAME)
if (healthConfiguration.enabled()) {
super.doStart();

executorService = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "node-health-check"));

producer =
vertx
.eventBus()
.registerCodec(new HealthCheckCodec())
.sender(
GIO_NODE_HEALTHCHECK_BUS,
new DeliveryOptions().setTracingPolicy(TracingPolicy.IGNORE).setCodecName(HealthCheckCodec.CODEC_NAME)
);

final NodeHealthCheckThread nodeHealthCheckThread = new NodeHealthCheckThread(
probeRegistry,
alertEventProducer,
producer,
node
);

managementEndpointManager.register(healthCheckEndpoint);

MeterRegistry micrometerRegistry = BackendRegistries.getDefaultNow();

if (micrometerRegistry instanceof PrometheusMeterRegistry) {
new NodeHealthCheckMicrometerHandler(probeRegistry).bindTo(micrometerRegistry);
}

((ScheduledExecutorService) executorService).scheduleWithFixedDelay(
nodeHealthCheckThread,
0,
healthConfiguration.delay(),
healthConfiguration.unit()
);

// Poll data
NodeHealthCheckThread statusRegistry = new NodeHealthCheckThread(probeManager.getProbes(), producer);

applicationContext.getAutowireCapableBeanFactory().autowireBean(statusRegistry);

metricsPollerId = vertx.setPeriodic(NODE_CHECKER_DELAY, statusRegistry);

healthCheckEndpoint.setRegistry(statusRegistry);
managementEndpointManager.register(healthCheckEndpoint);

MeterRegistry registry = BackendRegistries.getDefaultNow();

if (registry instanceof PrometheusMeterRegistry) {
new NodeHealthCheckMicrometerHandler(statusRegistry).bindTo(registry);
log.info("Node health check scheduled with fixed delay {} {} ", healthConfiguration.delay(), healthConfiguration.unit().name());
}
}

@Override
protected void doStop() throws Exception {
super.doStop();

if (metricsPollerId > 0) {
vertx.cancelTimer(metricsPollerId);
if (executorService != null && !executorService.isShutdown()) {
log.info("Stop node health check");
executorService.shutdownNow();
} else {
log.info("Node health check already shutdown");
}

if (producer != null) {
Expand Down
Loading

0 comments on commit e10d8a8

Please sign in to comment.