Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions docs/src/main/asciidoc/_configprops.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,6 @@
|spring.cloud.discovery.client.health-indicator.enabled | true |
|spring.cloud.discovery.client.health-indicator.include-description | false |
|spring.cloud.discovery.client.simple.instances | |
|spring.cloud.discovery.client.simple.local.instance-id | | The unique identifier or name for the service instance.
|spring.cloud.discovery.client.simple.local.metadata | | Metadata for the service instance. Can be used by discovery clients to modify their behaviour per instance, e.g. when load balancing.
|spring.cloud.discovery.client.simple.local.service-id | | The identifier or name for the service. Multiple instances might share the same service ID.
|spring.cloud.discovery.client.simple.local.uri | | The URI of the service instance. Will be parsed to extract the scheme, host, and port.
|spring.cloud.discovery.client.simple.order | |
|spring.cloud.discovery.enabled | true | Enables discovery client health indicators.
|spring.cloud.features.enabled | true | Enables the features endpoint.
Expand All @@ -34,6 +30,9 @@
|spring.cloud.loadbalancer.health-check.initial-delay | 0 | Initial delay value for the HealthCheck scheduler.
|spring.cloud.loadbalancer.health-check.interval | 25s | Interval for rerunning the HealthCheck scheduler.
|spring.cloud.loadbalancer.health-check.path | |
|spring.cloud.loadbalancer.health-check.refetch-instances | false | Indicates whether the instances should be refetched by the <code>HealthCheckServiceInstanceListSupplier</code>. This can be used if the instances can be updated and the underlying delegate does not provide an ongoing flux.
|spring.cloud.loadbalancer.health-check.refetch-instances-interval | 25s | Interval for refetching available service instances.
|spring.cloud.loadbalancer.health-check.repeat-health-check | true | Indicates whether health checks should keep repeating. It might be useful to set it to <code>false</code> if periodically refetching the instances, as every refetch will also trigger a healthcheck.
|spring.cloud.loadbalancer.retry.enabled | true |
|spring.cloud.loadbalancer.retry.max-retries-on-next-service-instance | 1 | Number of retries to be executed on the next <code>ServiceInstance</code>. A <code>ServiceInstance</code> is chosen before each retry call.
|spring.cloud.loadbalancer.retry.max-retries-on-same-service-instance | 0 | Number of retries to be executed on the same <code>ServiceInstance</code>.
Expand Down
12 changes: 9 additions & 3 deletions docs/src/main/asciidoc/spring-cloud-commons.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -953,16 +953,22 @@ TIP: This mechanism is particularly helpful while using the `SimpleDiscoveryClie
clients backed by an actual Service Registry, it's not necessary to use, as we already get
healthy instances after querying the external ServiceDiscovery.

TIP:: This supplier is also recommended for setups with a small number of instances per service
TIP: This supplier is also recommended for setups with a small number of instances per service
in order to avoid retrying calls on a failing instance.

WARNING: If using any of the Service Discovery-backed suppliers, adding this health-check mechanism is usually not necessary, as we retrieve the health state of the instances directly
from the Service Registry.

TIP: The `HealthCheckServiceInstanceListSupplier` relies on having updated instances provided by a delegate flux. In the rare cases when you want to use a delegate that does not refresh the instances, even though the list of instances may change (such as the `ReactiveDiscoveryClientServiceInstanceListSupplier` provided by us), you can set `spring.cloud.loadbalancer.health-check.refetch-instances` to `true` to have the instance list refreshed by the `HealthCheckServiceInstanceListSupplier`. You can then also adjust the refretch intervals by modifying the value of `spring.cloud.loadbalancer.health-check.refetch-instances-interval` and opt to disable the additional healthcheck repetitions by setting `spring.cloud.loadbalancer.repeat-health-check` to `fasle` as every instances refetch
will also trigger a healthcheck.

`HealthCheckServiceInstanceListSupplier` uses properties prefixed with
`spring.cloud.loadbalancer.health-check`. You can set the `initialDelay` and `interval`
for the scheduler. You can set the default path for the healthcheck URL by setting
the value of the `spring.cloud.loadbalancer.health-check.path.default`. You can also set a specific value
for any given service by setting the value of the `spring.cloud.loadbalancer.health-check.path.[SERVICE_ID]`, substituting the `[SERVICE_ID]` with the correct ID of your service. If the path is not set, `/actuator/health` is used by default.

TIP:: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own.
TIP: If you rely on the default path (`/actuator/health`), make sure you add `spring-boot-starter-actuator` to your collaborator's dependencies, unless you are planning to add such an endpoint on your own.

In order to use the health-check scheduler approach, you will have to instantiate a `HealthCheckServiceInstanceListSupplier` bean in a <<custom-loadbalancer-configuration,custom configuration>>.

Expand All @@ -987,7 +993,7 @@ public class CustomLoadBalancerConfiguration {
}
----

NOTE:: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`, therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`.
WARNING: `HealthCheckServiceInstanceListSupplier` has its own caching mechanism based on Reactor Flux `replay()`, therefore, if it's being used, you may want to skip wrapping that supplier with `CachingServiceInstanceListSupplier`.


[[spring-cloud-loadbalancer-starter]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,44 @@ public static class HealthCheck {
*/
private Duration interval = Duration.ofSeconds(25);

/**
* Interval for refetching available service instances.
*/
private Duration refetchInstancesInterval = Duration.ofSeconds(25);

private Map<String, String> path = new LinkedCaseInsensitiveMap<>();

/**
* Indicates whether the instances should be refetched by the
* <code>HealthCheckServiceInstanceListSupplier</code>. This can be used if the
* instances can be updated and the underlying delegate does not provide an
* ongoing flux.
*/
private boolean refetchInstances = false;

/**
* Indicates whether health checks should keep repeating. It might be useful to
* set it to <code>false</code> if periodically refetching the instances, as every
* refetch will also trigger a healthcheck.
*/
private boolean repeatHealthCheck = true;

public boolean getRefetchInstances() {
return refetchInstances;
}

public void setRefetchInstances(boolean refetchInstances) {
this.refetchInstances = refetchInstances;
}

public boolean getRepeatHealthCheck() {
return repeatHealthCheck;
}

public void setRepeatHealthCheck(boolean repeatHealthCheck) {
this.repeatHealthCheck = repeatHealthCheck;
}

public int getInitialDelay() {
return initialDelay;
}
Expand All @@ -66,6 +102,14 @@ public void setInitialDelay(int initialDelay) {
this.initialDelay = initialDelay;
}

public Duration getRefetchInstancesInterval() {
return refetchInstancesInterval;
}

public void setRefetchInstancesInterval(Duration refetchInstancesInterval) {
this.refetchInstancesInterval = refetchInstancesInterval;
}

public Map<String, String> getPath() {
return path;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.retry.Repeat;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
Expand Down Expand Up @@ -64,14 +65,19 @@ public class HealthCheckServiceInstanceListSupplier
public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
super(delegate);
this.healthCheck = healthCheck;
defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
"/actuator/health");
this.webClient = webClient;
aliveInstancesReplay = Flux.defer(delegate)
.delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay()))
this.healthCheck = healthCheck;
Repeat<Object> aliveInstancesReplayRepeat = Repeat
.onlyIf(repeatContext -> this.healthCheck.getRefetchInstances())
.fixedBackoff(healthCheck.getRefetchInstancesInterval());
Flux<List<ServiceInstance>> aliveInstancesFlux = Flux.defer(delegate)
.switchMap(serviceInstances -> healthCheckFlux(serviceInstances).map(
alive -> Collections.unmodifiableList(new ArrayList<>(alive))))
.repeatWhen(aliveInstancesReplayRepeat);
aliveInstancesReplay = aliveInstancesFlux
.delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay()))
.replay(1).refCount(1);
}

Expand All @@ -86,6 +92,9 @@ public void afterPropertiesSet() {

protected Flux<List<ServiceInstance>> healthCheckFlux(
List<ServiceInstance> instances) {
Repeat<Object> healthCheckFluxRepeat = Repeat
.onlyIf(repeatContext -> healthCheck.getRepeatHealthCheck())
.fixedBackoff(healthCheck.getInterval());
return Flux.defer(() -> {
List<Mono<ServiceInstance>> checks = new ArrayList<>(instances.size());
for (ServiceInstance instance : instances) {
Expand Down Expand Up @@ -117,7 +126,7 @@ protected Flux<List<ServiceInstance>> healthCheckFlux(
result.add(alive);
return result;
}).defaultIfEmpty(result);
}).repeatWhen(restart -> restart.delayElements(healthCheck.getInterval()));
}).repeatWhen(healthCheckFluxRepeat);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.cloud.loadbalancer.core;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -47,6 +48,8 @@
import org.springframework.web.reactive.function.client.WebClient;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

/**
* Tests for {@link HealthCheckServiceInstanceListSupplier}.
Expand Down Expand Up @@ -79,7 +82,7 @@ void setUp() {
}

@AfterEach
void tearDown() throws Exception {
void tearDown() {
if (listSupplier != null) {
listSupplier.destroy();
listSupplier = null;
Expand Down Expand Up @@ -140,14 +143,14 @@ void shouldReturnOnlyAliveService() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
Mockito.doReturn(Mono.just(false)).when(mock).isAlive(serviceInstance2);

Expand Down Expand Up @@ -176,14 +179,14 @@ void shouldEmitOnEachAliveServiceInBatch() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance2);

Expand Down Expand Up @@ -213,14 +216,14 @@ void shouldNotFailIfIsAliveReturnsError() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(true)).when(mock).isAlive(serviceInstance1);
Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
.isAlive(serviceInstance2);
Expand Down Expand Up @@ -250,8 +253,8 @@ void shouldEmitAllInstancesIfAllIsAliveChecksFailed() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));
Expand Down Expand Up @@ -282,8 +285,8 @@ void shouldMakeInitialDaleyAfterPropertiesSet() {
SERVICE_ID, "127.0.0.1", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get())
.thenReturn(Flux.just(Lists.list(serviceInstance1)));
Expand Down Expand Up @@ -314,14 +317,14 @@ void shouldRepeatIsAliveChecksIndefinitely() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get()).thenReturn(
Flux.just(Lists.list(serviceInstance1, serviceInstance2)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.doReturn(Mono.just(false), Mono.just(true)).when(mock)
.isAlive(serviceInstance1);
Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock)
Expand Down Expand Up @@ -352,14 +355,14 @@ void shouldTimeoutIsAliveCheck() {
SERVICE_ID, "127.0.0.1", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get())
.thenReturn(Flux.just(Lists.list(serviceInstance1)));

HealthCheckServiceInstanceListSupplier mock = Mockito
.mock(HealthCheckServiceInstanceListSupplier.class);
HealthCheckServiceInstanceListSupplier mock = mock(
HealthCheckServiceInstanceListSupplier.class);
Mockito.when(mock.isAlive(serviceInstance1)).thenReturn(Mono.never(),
Mono.just(true));

Expand Down Expand Up @@ -391,8 +394,8 @@ void shouldUpdateInstances() {
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Flux<List<ServiceInstance>> instances = Flux
.just(Lists.list(serviceInstance1))
Expand Down Expand Up @@ -421,6 +424,39 @@ protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
.verify(VERIFY_TIMEOUT);
}

@Test
void shouldRefetchInstances() {
healthCheck.setInitialDelay(1000);
healthCheck.setRepeatHealthCheck(false);
healthCheck.setRefetchInstancesInterval(Duration.ofSeconds(1));
healthCheck.setRefetchInstances(true);
ServiceInstance serviceInstance1 = new DefaultServiceInstance("ignored-service-1",
SERVICE_ID, "127.0.0.1", port, false);
ServiceInstance serviceInstance2 = new DefaultServiceInstance("ignored-service-2",
SERVICE_ID, "127.0.0.2", port, false);

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
when(delegate.get())
.thenReturn(Flux.just(Collections.singletonList(serviceInstance1)))
.thenReturn(Flux.just(Collections.singletonList(serviceInstance2)));
listSupplier = new HealthCheckServiceInstanceListSupplier(delegate,
healthCheck, webClient) {
@Override
protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
return Mono.just(true);
}
};
return listSupplier.get();
}).expectSubscription()
.expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()))
.expectNext(Lists.list(serviceInstance1))
.thenAwait(healthCheck.getRefetchInstancesInterval())
.expectNext(Lists.list(serviceInstance2)).thenCancel()
.verify(VERIFY_TIMEOUT);
}

@Test
void shouldCacheResultIfAfterPropertiesSetInvoked() {
healthCheck.setInitialDelay(1000);
Expand All @@ -430,8 +466,8 @@ void shouldCacheResultIfAfterPropertiesSetInvoked() {
AtomicInteger emitCounter = new AtomicInteger();

StepVerifier.withVirtualTime(() -> {
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(
ServiceInstanceListSupplier.class);
Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID);
Mockito.when(delegate.get())
.thenReturn(Flux.just(Lists.list(serviceInstance1)));
Expand Down Expand Up @@ -468,8 +504,7 @@ void shouldCancelSubscription() {

final AtomicInteger instancesCanceled = new AtomicInteger();
final AtomicBoolean subscribed = new AtomicBoolean();
ServiceInstanceListSupplier delegate = Mockito
.mock(ServiceInstanceListSupplier.class);
ServiceInstanceListSupplier delegate = mock(ServiceInstanceListSupplier.class);
Mockito.when(delegate.get())
.thenReturn(Flux.<List<ServiceInstance>>never()
.doOnSubscribe(subscription -> subscribed.set(true))
Expand Down