Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,19 @@

package org.springframework.cloud.loadbalancer.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties;
import org.springframework.http.HttpStatus;
Expand All @@ -40,10 +41,11 @@
* {@link WebClient} to ping the <code>health</code> endpoint of the instances.
*
* @author Olga Maciaszek-Sharma
* @author Roman Matiushchenko
* @since 2.2.0
*/
public class HealthCheckServiceInstanceListSupplier
implements ServiceInstanceListSupplier {
implements ServiceInstanceListSupplier, DisposableBean {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to make minimal changes to the current logic but
I've found testing of ping functionality a little bit inconvenient
HealthCheckServiceInstanceListSupplier starts ping implicitly and eventually I test just another stream which uses common state...
IMO It signals that this functionality must be either moved to a new class or just spitted into different methods.
So I would like to make HealthCheckServiceInstanceListSupplier an InitializingBean , and move start of pings out of constructor
and
make this

Flux<List<ServiceInstance>> instancesFlux = this.delegate.get().doOnNext(delegateInstances -> {
	this.instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances));
});

and

Flux<List<ServiceInstance>> verifiedInstancesFlux = healthCheckFlux()
		.doOnNext((verifiedInstances -> {
			this.healthyInstances = verifiedInstances;
		}));

two distinct methods which are invoked from afterPropertiesSet()
does this make sense @spencergibb @OlgaMaciaszek


private static final Log LOG = LogFactory
.getLog(HealthCheckServiceInstanceListSupplier.class);
Expand All @@ -56,50 +58,75 @@ public class HealthCheckServiceInstanceListSupplier

private final String defaultHealthCheckPath;

private List<ServiceInstance> instances = Collections
.synchronizedList(new ArrayList<>());
private volatile List<ServiceInstance> instances = Collections.emptyList();

private List<ServiceInstance> healthyInstances = Collections
.synchronizedList(new ArrayList<>());
private volatile List<ServiceInstance> healthyInstances = Collections.emptyList();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could remove state with such changes

private Flux<List<ServiceInstance>> aliveInstancesReplay;
...
//in the constructor 
this.aliveInstancesReplay = this.delegate.get()
				.switchMap(serviceInstances -> {
					return healthCheckFlux(serviceInstances).map(alive -> alive.isEmpty() ? serviceInstances : alive);
				})
				.replay(1)
				.autoConnect()
				.onBackpressureLatest();
...
protected Flux<List<ServiceInstance>> healthCheckFlux(List<ServiceInstance> instances) {
...//the same logic but uses `instances` method arg instead of class field
}
...
public void afterPropertiesSet() {
    this.healthCheckDisposable = aliveInstancesReplay.subscribe();
}

//and `get()` become just 
@Override
public Flux<List<ServiceInstance>> get() {
	return aliveInstancesReplay;
}

less state and it is easier to test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

downside - requires protected method signature change, but it was introduced recently so it is not a problem

private Disposable healthCheckDisposable;

public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
this.delegate = delegate;
this.healthCheck = healthCheck;
defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
this.defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default",
"/actuator/health");
this.webClient = webClient;
initInstances();

}

private void initInstances() {
delegate.get().subscribe(delegateInstances -> {
instances.clear();
instances.addAll(delegateInstances);
Flux<List<ServiceInstance>> instancesFlux = this.delegate.get().doOnNext(delegateInstances -> {
this.instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances));
});

Flux<List<ServiceInstance>> healthCheckFlux = healthCheckFlux();
Flux<List<ServiceInstance>> verifiedInstancesFlux = healthCheckFlux()
.doOnNext((verifiedInstances -> {
this.healthyInstances = verifiedInstances;
}));

healthCheckFlux.subscribe(verifiedInstances -> {
healthyInstances.clear();
healthyInstances.addAll(verifiedInstances);
});
this.healthCheckDisposable = Flux.merge(instancesFlux.then(), verifiedInstancesFlux.then())
.subscribe();
}

protected Flux<List<ServiceInstance>> healthCheckFlux() {
return Flux.create(emitter -> Schedulers
.newSingle("Health Check Verifier: " + getServiceId(), true)
.schedulePeriodically(() -> {
List<ServiceInstance> verifiedInstances = new ArrayList<>();
Flux.fromIterable(instances).filterWhen(this::isAlive)
.subscribe(serviceInstance -> {
verifiedInstances.add(serviceInstance);
emitter.next(verifiedInstances);
});
}, healthCheck.getInitialDelay(), healthCheck.getInterval().toMillis(),
TimeUnit.MILLISECONDS),
FluxSink.OverflowStrategy.LATEST);
return Flux.defer(() -> {
List<Mono<ServiceInstance>> checks = new ArrayList<>(this.instances.size());
for (ServiceInstance instance : this.instances) {
Mono<ServiceInstance> alive = isAlive(instance)
.onErrorResume(error -> {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"Exception occurred during health check of the instance for service %s: %s",
instance.getServiceId(), instance.getUri()), error);
}
return Mono.empty();
})
.timeout(this.healthCheck.getInterval(), Mono.defer(() -> {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format(
"The instance for service %s: %s did not respond for %s during health check",
instance.getServiceId(), instance.getUri(), this.healthCheck.getInterval()));
}
return Mono.empty();
}))
.handle((isHealthy, sink) -> {
if (isHealthy) {
sink.next(instance);
}
});

checks.add(alive);
}
List<ServiceInstance> result = new CopyOnWriteArrayList<>();
return Flux.merge(checks).map(alive -> {
result.add(alive);
return result;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as List is mutated should it be emitted on any update or only on first?
this could be rewritten to

List<ServiceInstance> result = new CopyOnWriteArrayList<>();
AtomicBoolean first = new AtomicBoolean(false);
return Flux.merge(checks).<List<ServiceInstance>>handle((alive, sink) -> {
	result.add(alive);
	if (first.compareAndSet(false, true)) {
		sink.next(result);
	}
})
.defaultIfEmpty(result);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just Flux.merge(checks).collectList().defaultIfEmpty(emptyList())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this question was addressed to my intermediate implementation which mutated healthyInstances field.
I was trying to preserve initial behavior.
.collectList() will wait until all health checks are completed, in initial implementation List was emitted on the first healthy instance presence. As far as I understand it was made due to the fact that all health checks could take a long time (in current implementation it is this.healthCheck.getInterval() but in initial it could be infinity) but it should return healthy instances as soon as possible.

})
.defaultIfEmpty(result);
})
.repeatWhen(restart -> restart.delayElements(this.healthCheck.getInterval()))
.delaySubscription(Duration.ofMillis(this.healthCheck.getInitialDelay()));
}

@Override
Expand All @@ -109,16 +136,18 @@ public String getServiceId() {

@Override
public Flux<List<ServiceInstance>> get() {
if (!healthyInstances.isEmpty()) {
return Flux.defer(() -> Flux.fromIterable(healthyInstances).collectList());
}
// If there are no healthy instances, it might be better to still retry on all of
// them
if (LOG.isWarnEnabled()) {
LOG.warn(
"No verified healthy instances were found, returning all listed instances.");
}
return Flux.defer(() -> Flux.fromIterable(instances).collectList());
return Flux.defer(() -> {
List<ServiceInstance> it = new ArrayList<>(this.healthyInstances);
if (it.isEmpty()) {
// If there are no healthy instances, it might be better to still retry on all of
// them
if (LOG.isWarnEnabled()) {
LOG.warn("No verified healthy instances were found, returning all listed instances.");
}
it = this.instances;
}
return Flux.just(it);
});
}

protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
Expand All @@ -130,7 +159,14 @@ protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
.uri(UriComponentsBuilder.fromUri(serviceInstance.getUri())
.path(healthCheckPath).build().toUri())
.exchange()
.map(clientResponse -> HttpStatus.OK.equals(clientResponse.statusCode()));
.flatMap(clientResponse -> clientResponse.releaseBody()
.thenReturn(HttpStatus.OK.value() == clientResponse.rawStatusCode())
);
}

@Override
public void destroy() {
this.healthCheckDisposable.dispose();
}

}
Loading