Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions spring-cloud-loadbalancer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,10 @@
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@

package org.springframework.cloud.loadbalancer.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties;
import org.springframework.http.HttpStatus;
Expand All @@ -40,10 +41,11 @@
* {@link WebClient} to ping the <code>health</code> endpoint of the instances.
*
* @author Olga Maciaszek-Sharma
* @author Roman Matiushchenko
* @since 2.2.0
*/
public class HealthCheckServiceInstanceListSupplier
implements ServiceInstanceListSupplier {
implements ServiceInstanceListSupplier, InitializingBean, DisposableBean {

private static final Log LOG = LogFactory
.getLog(HealthCheckServiceInstanceListSupplier.class);
Expand All @@ -56,50 +58,72 @@ public class HealthCheckServiceInstanceListSupplier

private final String defaultHealthCheckPath;

private List<ServiceInstance> instances = Collections
.synchronizedList(new ArrayList<>());
private final Flux<List<ServiceInstance>> aliveInstancesReplay;

private List<ServiceInstance> healthyInstances = Collections
.synchronizedList(new ArrayList<>());
private Disposable healthCheckDisposable;

public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
this.delegate = delegate;
this.healthCheck = healthCheck;
defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
this.defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
"/actuator/health");
this.webClient = webClient;
initInstances();

this.aliveInstancesReplay = Flux.defer(delegate)
.delaySubscription(Duration.ofMillis(this.healthCheck.getInitialDelay()))
.switchMap(serviceInstances -> healthCheckFlux(serviceInstances)
.map(alive -> Collections.unmodifiableList(new ArrayList<>(alive)))
)
.replay(1)
.refCount(1);
}

private void initInstances() {
delegate.get().subscribe(delegateInstances -> {
instances.clear();
instances.addAll(delegateInstances);
});

Flux<List<ServiceInstance>> healthCheckFlux = healthCheckFlux();

healthCheckFlux.subscribe(verifiedInstances -> {
healthyInstances.clear();
healthyInstances.addAll(verifiedInstances);
});
@Override
public void afterPropertiesSet() {
Disposable healthCheckDisposable = this.healthCheckDisposable;
if (healthCheckDisposable != null) {
healthCheckDisposable.dispose();
}
this.healthCheckDisposable = aliveInstancesReplay.subscribe();
}

protected Flux<List<ServiceInstance>> healthCheckFlux() {
return Flux.create(emitter -> Schedulers
.newSingle("Health Check Verifier: " + getServiceId(), true)
.schedulePeriodically(() -> {
List<ServiceInstance> verifiedInstances = new ArrayList<>();
Flux.fromIterable(instances).filterWhen(this::isAlive)
.subscribe(serviceInstance -> {
verifiedInstances.add(serviceInstance);
emitter.next(verifiedInstances);
});
}, healthCheck.getInitialDelay(), healthCheck.getInterval().toMillis(),
TimeUnit.MILLISECONDS),
FluxSink.OverflowStrategy.LATEST);
protected Flux<List<ServiceInstance>> healthCheckFlux(List<ServiceInstance> instances) {
return Flux.defer(() -> {
List<Mono<ServiceInstance>> checks = new ArrayList<>(instances.size());
for (ServiceInstance instance : instances) {
Mono<ServiceInstance> alive = isAlive(instance).onErrorResume(error -> {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Exception occurred during health check of the instance for service %s: %s",
instance.getServiceId(), instance.getUri()), error);
}
return Mono.empty();
})
.timeout(this.healthCheck.getInterval(), Mono.defer(() -> {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"The instance for service %s: %s did not respond for %s during health check",
instance.getServiceId(), instance.getUri(),
this.healthCheck.getInterval()));
}
return Mono.empty();
}))
.handle((isHealthy, sink) -> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be replaced with .filter(isHealthy -> isHealthy)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bsideup , not really.
it can be replaced by

.filter(isHealthy -> isHealthy)
.map(it -> instance)

but is filter + map more efficient then handle? I thought handle is just for such cases

if (isHealthy) {
sink.next(instance);
}
});

checks.add(alive);
}
List<ServiceInstance> result = new ArrayList<>();
return Flux.merge(checks).map(alive -> {
result.add(alive);
return result;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as List is mutated should it be emitted on any update or only on first?
this could be rewritten to

List<ServiceInstance> result = new CopyOnWriteArrayList<>();
AtomicBoolean first = new AtomicBoolean(false);
return Flux.merge(checks).<List<ServiceInstance>>handle((alive, sink) -> {
	result.add(alive);
	if (first.compareAndSet(false, true)) {
		sink.next(result);
	}
})
.defaultIfEmpty(result);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just Flux.merge(checks).collectList().defaultIfEmpty(emptyList())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this question was addressed to my intermediate implementation which mutated healthyInstances field.
I was trying to preserve initial behavior.
.collectList() will wait until all health checks are completed, in initial implementation List was emitted on the first healthy instance presence. As far as I understand it was made due to the fact that all health checks could take a long time (in current implementation it is this.healthCheck.getInterval() but in initial it could be infinity) but it should return healthy instances as soon as possible.

})
.defaultIfEmpty(result);
})
.repeatWhen(restart -> restart.delayElements(this.healthCheck.getInterval()));
}

@Override
Expand All @@ -109,16 +133,7 @@ public String getServiceId() {

@Override
public Flux<List<ServiceInstance>> get() {
if (!healthyInstances.isEmpty()) {
return Flux.defer(() -> Flux.fromIterable(healthyInstances).collectList());
}
// If there are no healthy instances, it might be better to still retry on all of
// them
if (LOG.isWarnEnabled()) {
LOG.warn(
"No verified healthy instances were found, returning all listed instances.");
}
return Flux.defer(() -> Flux.fromIterable(instances).collectList());
return aliveInstancesReplay;
}

protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
Expand All @@ -130,7 +145,17 @@ protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
.uri(UriComponentsBuilder.fromUri(serviceInstance.getUri())
.path(healthCheckPath).build().toUri())
.exchange()
.map(clientResponse -> HttpStatus.OK.equals(clientResponse.statusCode()));
.flatMap(clientResponse -> clientResponse.releaseBody()
.thenReturn(HttpStatus.OK.value() == clientResponse.rawStatusCode())
);
}

@Override
public void destroy() {
Disposable healthCheckDisposable = this.healthCheckDisposable;
if (healthCheckDisposable != null) {
healthCheckDisposable.dispose();
}
}

}
Loading