Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@

package org.springframework.cloud.loadbalancer.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties;
import org.springframework.http.HttpStatus;
Expand All @@ -40,10 +42,11 @@
* {@link WebClient} to ping the <code>health</code> endpoint of the instances.
*
* @author Olga Maciaszek-Sharma
* @author Roman Matiushchenko
* @since 2.2.0
*/
public class HealthCheckServiceInstanceListSupplier
implements ServiceInstanceListSupplier {
implements ServiceInstanceListSupplier, DisposableBean {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried to make minimal changes to the current logic but
I've found testing of ping functionality a little bit inconvenient
HealthCheckServiceInstanceListSupplier starts ping implicitly and eventually I test just another stream which uses common state...
IMO It signals that this functionality must be either moved to a new class or just spitted into different methods.
So I would like to make HealthCheckServiceInstanceListSupplier an InitializingBean , and move start of pings out of constructor
and
make this

Flux<List<ServiceInstance>> instancesFlux = this.delegate.get().doOnNext(delegateInstances -> {
	this.instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances));
});

and

Flux<List<ServiceInstance>> verifiedInstancesFlux = healthCheckFlux()
		.doOnNext((verifiedInstances -> {
			this.healthyInstances = verifiedInstances;
		}));

two distinct methods which are invoked from afterPropertiesSet()
does this make sense @spencergibb @OlgaMaciaszek


private static final Log LOG = LogFactory
.getLog(HealthCheckServiceInstanceListSupplier.class);
Expand All @@ -56,11 +59,11 @@ public class HealthCheckServiceInstanceListSupplier

private final String defaultHealthCheckPath;

private List<ServiceInstance> instances = Collections
.synchronizedList(new ArrayList<>());
private List<ServiceInstance> instances = Collections.emptyList();

private List<ServiceInstance> healthyInstances = Collections
.synchronizedList(new ArrayList<>());
private volatile List<ServiceInstance> healthyInstances = Collections.emptyList();

private Disposable healthCheckDisposable;

public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate,
LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) {
Expand All @@ -74,32 +77,33 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega
}

private void initInstances() {
delegate.get().subscribe(delegateInstances -> {
instances.clear();
instances.addAll(delegateInstances);
});

Flux<List<ServiceInstance>> healthCheckFlux = healthCheckFlux();

healthCheckFlux.subscribe(verifiedInstances -> {
healthyInstances.clear();
healthyInstances.addAll(verifiedInstances);
});
healthCheckDisposable = delegate.get().doOnNext(delegateInstances -> {
Copy link
Contributor Author

@robotmrv robotmrv Feb 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see contract about ServiceInstanceListSupplier#get() Flux
Is it finite or could potentially be infinite?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is finite (and assume that it has only one emit) what is the reason to return Flux instead of Mono from ServiceInstanceListSupplier#get?
If there is such contract maybe it is a good idea to document it, because it is not clear from current returning type

instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances));
})
.thenMany(healthCheckFlux()).subscribeOn(Schedulers.parallel())
.subscribe(verifiedInstances -> healthyInstances = verifiedInstances);
}

protected Flux<List<ServiceInstance>> healthCheckFlux() {
return Flux.create(emitter -> Schedulers
.newSingle("Health Check Verifier: " + getServiceId(), true)
.schedulePeriodically(() -> {
List<ServiceInstance> verifiedInstances = new ArrayList<>();
Flux.fromIterable(instances).filterWhen(this::isAlive)
.subscribe(serviceInstance -> {
verifiedInstances.add(serviceInstance);
emitter.next(verifiedInstances);
});
}, healthCheck.getInitialDelay(), healthCheck.getInterval().toMillis(),
TimeUnit.MILLISECONDS),
FluxSink.OverflowStrategy.LATEST);
return Flux.defer(() -> {
List<ServiceInstance> result = new CopyOnWriteArrayList<>();

List<Mono<ServiceInstance>> checks = new ArrayList<>();
for (ServiceInstance instance : instances) {
Mono<ServiceInstance> alive = isAlive(instance)
.onErrorResume(throwable -> Mono.empty())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should some logs be added here?
if so debug or warn?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, debug is my first instinct.

.timeout(healthCheck.getInterval(), Mono.empty()).filter(it -> it)
.map(check -> instance);

checks.add(alive);
}
return Flux.merge(checks).map(alive -> {
result.add(alive);
return result;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as far as List is mutated should it be emitted on any update or only on first?
this could be rewritten to

List<ServiceInstance> result = new CopyOnWriteArrayList<>();
AtomicBoolean first = new AtomicBoolean(false);
return Flux.merge(checks).<List<ServiceInstance>>handle((alive, sink) -> {
	result.add(alive);
	if (first.compareAndSet(false, true)) {
		sink.next(result);
	}
})
.defaultIfEmpty(result);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just Flux.merge(checks).collectList().defaultIfEmpty(emptyList())?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this question was addressed to my intermediate implementation which mutated healthyInstances field.
I was trying to preserve initial behavior.
.collectList() will wait until all health checks are completed, in initial implementation List was emitted on the first healthy instance presence. As far as I understand it was made due to the fact that all health checks could take a long time (in current implementation it is this.healthCheck.getInterval() but in initial it could be infinity) but it should return healthy instances as soon as possible.

}).defaultIfEmpty(result);
})
.repeatWhen(restart -> restart.delayElements(healthCheck.getInterval()))
.delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay()));
}

@Override
Expand All @@ -109,16 +113,16 @@ public String getServiceId() {

@Override
public Flux<List<ServiceInstance>> get() {
if (!healthyInstances.isEmpty()) {
return Flux.defer(() -> Flux.fromIterable(healthyInstances).collectList());
}
// If there are no healthy instances, it might be better to still retry on all of
// them
if (LOG.isWarnEnabled()) {
LOG.warn(
"No verified healthy instances were found, returning all listed instances.");
}
return Flux.defer(() -> Flux.fromIterable(instances).collectList());
return Flux.defer(() -> {
List<ServiceInstance> it = new ArrayList<>(healthyInstances);
if (it.isEmpty()) {
if (LOG.isWarnEnabled()) {
LOG.warn("No verified healthy instances were found, returning all listed instances.");
}
it = instances;
}
return Flux.just(it);
});
}

protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
Expand All @@ -130,7 +134,14 @@ protected Mono<Boolean> isAlive(ServiceInstance serviceInstance) {
.uri(UriComponentsBuilder.fromUri(serviceInstance.getUri())
.path(healthCheckPath).build().toUri())
.exchange()
.map(clientResponse -> HttpStatus.OK.equals(clientResponse.statusCode()));
.flatMap(clientResponse -> clientResponse.releaseBody()
.thenReturn(HttpStatus.OK.equals(clientResponse.statusCode()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should non standard codes be taken into account?
if so - it is better to use ClientResponse.rawStatusCode()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

);
}

@Override
public void destroy() {
this.healthCheckDisposable.dispose();
}

}