From 2e553a049101929014b873975ff306e7b1cb9f1d Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Fri, 7 Feb 2020 18:26:58 +0200 Subject: [PATCH 01/12] fixes issues from gh-683 - WebClient response leaks - potential Scheduled task leak - rework polling to plain reactor operators - remove non atomic operations on ServiceInstance lists related to gh-629 --- ...ealthCheckServiceInstanceListSupplier.java | 92 ++++++++++--------- 1 file changed, 50 insertions(+), 42 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index 0374a00d8..3f9865443 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -16,18 +16,20 @@ package org.springframework.cloud.loadbalancer.core; +import java.time.Duration; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import reactor.core.Disposable; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; +import org.springframework.beans.factory.DisposableBean; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties; import org.springframework.http.HttpStatus; @@ -40,10 +42,11 @@ * {@link WebClient} to ping the health endpoint of the instances. * * @author Olga Maciaszek-Sharma + * @author Roman Matiushchenko * @since 2.2.0 */ public class HealthCheckServiceInstanceListSupplier - implements ServiceInstanceListSupplier { + implements ServiceInstanceListSupplier, DisposableBean { private static final Log LOG = LogFactory .getLog(HealthCheckServiceInstanceListSupplier.class); @@ -56,11 +59,11 @@ public class HealthCheckServiceInstanceListSupplier private final String defaultHealthCheckPath; - private List instances = Collections - .synchronizedList(new ArrayList<>()); + private List instances = Collections.emptyList(); - private List healthyInstances = Collections - .synchronizedList(new ArrayList<>()); + private volatile List healthyInstances = Collections.emptyList(); + + private Disposable healthCheckDisposable; public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) { @@ -74,32 +77,31 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega } private void initInstances() { - delegate.get().subscribe(delegateInstances -> { - instances.clear(); - instances.addAll(delegateInstances); - }); - - Flux> healthCheckFlux = healthCheckFlux(); - - healthCheckFlux.subscribe(verifiedInstances -> { - healthyInstances.clear(); - healthyInstances.addAll(verifiedInstances); - }); + healthCheckDisposable = delegate.get().doOnNext(delegateInstances -> { + instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances)); + }).thenMany(healthCheckFlux()).subscribeOn(Schedulers.parallel()) + .subscribe(verifiedInstances -> healthyInstances = verifiedInstances); } protected Flux> healthCheckFlux() { - return Flux.create(emitter -> Schedulers - .newSingle("Health Check Verifier: " + getServiceId(), true) - .schedulePeriodically(() -> { - List verifiedInstances = new ArrayList<>(); - Flux.fromIterable(instances).filterWhen(this::isAlive) - .subscribe(serviceInstance -> { - verifiedInstances.add(serviceInstance); - emitter.next(verifiedInstances); - }); - }, healthCheck.getInitialDelay(), healthCheck.getInterval().toMillis(), - TimeUnit.MILLISECONDS), - FluxSink.OverflowStrategy.LATEST); + return Flux.defer(() -> { + List result = new CopyOnWriteArrayList<>(); + + List> checks = new ArrayList<>(); + for (ServiceInstance instance : instances) { + Mono alive = isAlive(instance) + .onErrorResume(throwable -> Mono.empty()) + .timeout(healthCheck.getInterval(), Mono.empty()).filter(it -> it) + .map(check -> instance); + + checks.add(alive); + } + return Flux.merge(checks).map(alive -> { + result.add(alive); + return result; + }).defaultIfEmpty(result); + }).repeatWhen(restart -> restart.delayElements(healthCheck.getInterval())) + .delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay())); } @Override @@ -109,16 +111,17 @@ public String getServiceId() { @Override public Flux> get() { - if (!healthyInstances.isEmpty()) { - return Flux.defer(() -> Flux.fromIterable(healthyInstances).collectList()); - } - // If there are no healthy instances, it might be better to still retry on all of - // them - if (LOG.isWarnEnabled()) { - LOG.warn( - "No verified healthy instances were found, returning all listed instances."); - } - return Flux.defer(() -> Flux.fromIterable(instances).collectList()); + return Flux.defer(() -> { + List it = new ArrayList<>(healthyInstances); + if (it.isEmpty()) { + if (LOG.isWarnEnabled()) { + LOG.warn( + "No verified healthy instances were found, returning all listed instances."); + } + it = instances; + } + return Flux.just(it); + }); } protected Mono isAlive(ServiceInstance serviceInstance) { @@ -129,8 +132,13 @@ protected Mono isAlive(ServiceInstance serviceInstance) { return webClient.get() .uri(UriComponentsBuilder.fromUri(serviceInstance.getUri()) .path(healthCheckPath).build().toUri()) - .exchange() - .map(clientResponse -> HttpStatus.OK.equals(clientResponse.statusCode())); + .exchange().flatMap(clientResponse -> clientResponse.releaseBody() + .thenReturn(HttpStatus.OK.equals(clientResponse.statusCode()))); + } + + @Override + public void destroy() { + this.healthCheckDisposable.dispose(); } } From c1d1675d3e9404bab9a448986fe5feec67d5e4a8 Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Fri, 7 Feb 2020 18:34:59 +0200 Subject: [PATCH 02/12] formatting --- ...ealthCheckServiceInstanceListSupplier.java | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index 3f9865443..552f96fd3 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -79,8 +79,9 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega private void initInstances() { healthCheckDisposable = delegate.get().doOnNext(delegateInstances -> { instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances)); - }).thenMany(healthCheckFlux()).subscribeOn(Schedulers.parallel()) - .subscribe(verifiedInstances -> healthyInstances = verifiedInstances); + }) + .thenMany(healthCheckFlux()).subscribeOn(Schedulers.parallel()) + .subscribe(verifiedInstances -> healthyInstances = verifiedInstances); } protected Flux> healthCheckFlux() { @@ -100,8 +101,9 @@ protected Flux> healthCheckFlux() { result.add(alive); return result; }).defaultIfEmpty(result); - }).repeatWhen(restart -> restart.delayElements(healthCheck.getInterval())) - .delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay())); + }) + .repeatWhen(restart -> restart.delayElements(healthCheck.getInterval())) + .delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay())); } @Override @@ -115,8 +117,7 @@ public Flux> get() { List it = new ArrayList<>(healthyInstances); if (it.isEmpty()) { if (LOG.isWarnEnabled()) { - LOG.warn( - "No verified healthy instances were found, returning all listed instances."); + LOG.warn("No verified healthy instances were found, returning all listed instances."); } it = instances; } @@ -132,8 +133,10 @@ protected Mono isAlive(ServiceInstance serviceInstance) { return webClient.get() .uri(UriComponentsBuilder.fromUri(serviceInstance.getUri()) .path(healthCheckPath).build().toUri()) - .exchange().flatMap(clientResponse -> clientResponse.releaseBody() - .thenReturn(HttpStatus.OK.equals(clientResponse.statusCode()))); + .exchange() + .flatMap(clientResponse -> clientResponse.releaseBody() + .thenReturn(HttpStatus.OK.equals(clientResponse.statusCode())) + ); } @Override From a1b60a17a8e2bf895113becb82555320e45e1fe4 Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Fri, 7 Feb 2020 21:41:52 +0200 Subject: [PATCH 03/12] add logs and handle non-standard statuses --- ...ealthCheckServiceInstanceListSupplier.java | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index 552f96fd3..dcea2ae03 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -80,27 +80,47 @@ private void initInstances() { healthCheckDisposable = delegate.get().doOnNext(delegateInstances -> { instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances)); }) - .thenMany(healthCheckFlux()).subscribeOn(Schedulers.parallel()) + .thenMany(healthCheckFlux()) + .subscribeOn(Schedulers.parallel()) .subscribe(verifiedInstances -> healthyInstances = verifiedInstances); } protected Flux> healthCheckFlux() { return Flux.defer(() -> { - List result = new CopyOnWriteArrayList<>(); - - List> checks = new ArrayList<>(); + List> checks = new ArrayList<>(instances.size()); for (ServiceInstance instance : instances) { Mono alive = isAlive(instance) - .onErrorResume(throwable -> Mono.empty()) - .timeout(healthCheck.getInterval(), Mono.empty()).filter(it -> it) - .map(check -> instance); + .onErrorResume(error -> { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Exception occurred during health check of the instance for service %s: %s", + instance.getServiceId(), instance.getUri()), error); + } + return Mono.empty(); + }) + .timeout(healthCheck.getInterval(), Mono.fromSupplier(() -> { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "The instance for service %s: %s did not respond for %s during health check", + instance.getServiceId(), instance.getUri(), healthCheck.getInterval())); + } + //returns Completion signal if null + return null; + })) + .handle((isHealthy, sink) -> { + if (isHealthy) { + sink.next(instance); + } + }); checks.add(alive); } + List result = new CopyOnWriteArrayList<>(); return Flux.merge(checks).map(alive -> { result.add(alive); return result; - }).defaultIfEmpty(result); + }) + .defaultIfEmpty(result); }) .repeatWhen(restart -> restart.delayElements(healthCheck.getInterval())) .delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay())); @@ -135,7 +155,7 @@ protected Mono isAlive(ServiceInstance serviceInstance) { .path(healthCheckPath).build().toUri()) .exchange() .flatMap(clientResponse -> clientResponse.releaseBody() - .thenReturn(HttpStatus.OK.equals(clientResponse.statusCode())) + .thenReturn(HttpStatus.OK.value() == clientResponse.rawStatusCode()) ); } From 5905dfb0dfb48240e4a7b4a4953f7287cdb86f55 Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Fri, 7 Feb 2020 22:26:43 +0200 Subject: [PATCH 04/12] - add logs - handle non-standard statuses - instancesFlux and verifiedInstancesFlux work in parallel --- ...ealthCheckServiceInstanceListSupplier.java | 36 +++++++++++-------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index dcea2ae03..1a3a62b1a 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -59,7 +59,7 @@ public class HealthCheckServiceInstanceListSupplier private final String defaultHealthCheckPath; - private List instances = Collections.emptyList(); + private volatile List instances = Collections.emptyList(); private volatile List healthyInstances = Collections.emptyList(); @@ -69,7 +69,7 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) { this.delegate = delegate; this.healthCheck = healthCheck; - defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default", + this.defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default", "/actuator/health"); this.webClient = webClient; initInstances(); @@ -77,18 +77,24 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega } private void initInstances() { - healthCheckDisposable = delegate.get().doOnNext(delegateInstances -> { - instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances)); - }) - .thenMany(healthCheckFlux()) - .subscribeOn(Schedulers.parallel()) - .subscribe(verifiedInstances -> healthyInstances = verifiedInstances); + Flux> instancesFlux = this.delegate.get().doOnNext(delegateInstances -> { + this.instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances)); + }); + + Flux> verifiedInstancesFlux = healthCheckFlux() + .doOnNext((verifiedInstances -> { + this.healthyInstances = verifiedInstances; + })); + + this.healthCheckDisposable = Flux.merge(instancesFlux.then(), verifiedInstancesFlux.then()) + .subscribeOn(Schedulers.parallel()) + .subscribe(); } protected Flux> healthCheckFlux() { return Flux.defer(() -> { - List> checks = new ArrayList<>(instances.size()); - for (ServiceInstance instance : instances) { + List> checks = new ArrayList<>(this.instances.size()); + for (ServiceInstance instance : this.instances) { Mono alive = isAlive(instance) .onErrorResume(error -> { if (LOG.isDebugEnabled()) { @@ -98,11 +104,11 @@ protected Flux> healthCheckFlux() { } return Mono.empty(); }) - .timeout(healthCheck.getInterval(), Mono.fromSupplier(() -> { + .timeout(this.healthCheck.getInterval(), Mono.fromSupplier(() -> { if (LOG.isDebugEnabled()) { LOG.debug(String.format( "The instance for service %s: %s did not respond for %s during health check", - instance.getServiceId(), instance.getUri(), healthCheck.getInterval())); + instance.getServiceId(), instance.getUri(), this.healthCheck.getInterval())); } //returns Completion signal if null return null; @@ -134,12 +140,14 @@ public String getServiceId() { @Override public Flux> get() { return Flux.defer(() -> { - List it = new ArrayList<>(healthyInstances); + List it = new ArrayList<>(this.healthyInstances); if (it.isEmpty()) { + // If there are no healthy instances, it might be better to still retry on all of + // them if (LOG.isWarnEnabled()) { LOG.warn("No verified healthy instances were found, returning all listed instances."); } - it = instances; + it = this.instances; } return Flux.just(it); }); From 66d2cd6d72d85e744c019ea6fa173db8363bf631 Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Sun, 9 Feb 2020 10:07:11 +0200 Subject: [PATCH 05/12] add tests --- ...ealthCheckServiceInstanceListSupplier.java | 7 +- ...CheckServiceInstanceListSupplierTests.java | 231 ++++++++++++++++++ 2 files changed, 233 insertions(+), 5 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index 1a3a62b1a..3e8627ae3 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -27,7 +27,6 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.scheduler.Schedulers; import org.springframework.beans.factory.DisposableBean; import org.springframework.cloud.client.ServiceInstance; @@ -87,7 +86,6 @@ private void initInstances() { })); this.healthCheckDisposable = Flux.merge(instancesFlux.then(), verifiedInstancesFlux.then()) - .subscribeOn(Schedulers.parallel()) .subscribe(); } @@ -104,14 +102,13 @@ protected Flux> healthCheckFlux() { } return Mono.empty(); }) - .timeout(this.healthCheck.getInterval(), Mono.fromSupplier(() -> { + .timeout(this.healthCheck.getInterval(), Mono.defer(() -> { if (LOG.isDebugEnabled()) { LOG.debug(String.format( "The instance for service %s: %s did not respond for %s during health check", instance.getServiceId(), instance.getUri(), this.healthCheck.getInterval())); } - //returns Completion signal if null - return null; + return Mono.empty(); })) .handle((isHealthy, sink) -> { if (isHealthy) { diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java index 8f6961a81..43d3da6e3 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java @@ -16,8 +16,18 @@ package org.springframework.cloud.loadbalancer.core; +import java.time.Duration; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.assertj.core.api.Assertions; +import org.assertj.core.util.Lists; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; @@ -39,6 +49,7 @@ * Tests for {@link HealthCheckServiceInstanceListSupplier}. * * @author Olga Maciaszek-Sharma + * @author Roman Matiushchenko */ @ExtendWith(SpringExtension.class) @SpringBootTest( @@ -52,6 +63,7 @@ class HealthCheckServiceInstanceListSupplierTests { private final WebClient webClient = WebClient.create(); private LoadBalancerProperties.HealthCheck healthCheck = new LoadBalancerProperties.HealthCheck(); + private static final String SERVICE_ID = "ignored-service"; @SuppressWarnings("ConstantConditions") @Test @@ -100,6 +112,225 @@ void shouldReturnFalseIfEndpointNotFound() { assertThat(alive).isFalse(); } + @Test + void shouldReturnOnlyAliveService() { + LoadBalancerProperties.HealthCheck healthCheck = new LoadBalancerProperties.HealthCheck(); + healthCheck.setInitialDelay(1000); + + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", + SERVICE_ID, "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", + SERVICE_ID, "127.0.0.2", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); + HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + delegate, + healthCheck, webClient); + + listSupplier = Mockito.spy(listSupplier); + Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si1); + Mockito.doReturn(Mono.just(false)).when(listSupplier).isAlive(si2); + return listSupplier.healthCheckFlux(); + }) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) + .expectNext(Lists.list(si1)) + .expectNoEvent(healthCheck.getInterval()) + .thenCancel() + .verify(); + } + + @Test + void shouldEmitOnEachAliveServiceInBatch() { + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", + SERVICE_ID, "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", + SERVICE_ID, "127.0.0.2", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); + HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + delegate, + healthCheck, webClient); + + listSupplier = Mockito.spy(listSupplier); + Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si1); + Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si2); + return listSupplier.healthCheckFlux(); + }) + .expectNext(Lists.list(si1)) + .expectNext(Lists.list(si1, si2)) + .expectNoEvent(healthCheck.getInterval()) + .thenCancel() + .verify(); + } + + @Test + void shouldNotFailIfIsAliveReturnsError() { + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", + SERVICE_ID, "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", + SERVICE_ID, "127.0.0.2", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); + HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + delegate, + healthCheck, webClient); + listSupplier = Mockito.spy(listSupplier); + Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si1); + Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); + return listSupplier.healthCheckFlux(); + }) + .expectNext(Lists.list(si1)) + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()).plus(healthCheck.getInterval())) + .thenCancel() + .verify(); + } + + @Test + void shouldEmitEmptyListIfAllIsAliveChecksFailed() { + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", + SERVICE_ID, "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", + SERVICE_ID, "127.0.0.2", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); + HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + delegate, + healthCheck, webClient); + listSupplier = Mockito.spy(listSupplier); + Mockito.doReturn(Mono.just(false)).when(listSupplier).isAlive(si1); + Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); + return listSupplier.healthCheckFlux(); + }) + .expectNext(Lists.list()) + .expectNoEvent(healthCheck.getInterval()) + .thenCancel() + .verify(); + } + + @Test + void shouldRepeatIsAliveChecksIndefinitely() { + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", + SERVICE_ID, "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", + SERVICE_ID, "127.0.0.2", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); + HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + delegate, + healthCheck, webClient); + listSupplier = Mockito.spy(listSupplier); + Mockito.doReturn(Mono.just(false), Mono.just(true)).when(listSupplier).isAlive(si1); + Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); + return listSupplier.healthCheckFlux(); + }) + .expectNext(Lists.list()) + .expectNoEvent(healthCheck.getInterval()) + .expectNext(Lists.list(si1)) + .expectNoEvent(healthCheck.getInterval()) + .expectNext(Lists.list(si1)) + .thenCancel() + .verify(); + } + + @Test + void shouldTimeoutIsAliveCheck() { + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", + SERVICE_ID, "127.0.0.1", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1))); + HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + delegate, + healthCheck, webClient); + listSupplier = Mockito.spy(listSupplier); + Mockito.doReturn(Mono.never(), Mono.just(true)) + .when(listSupplier).isAlive(si1); + return listSupplier.healthCheckFlux(); + }) + .thenAwait(Duration.ofMillis(healthCheck.getInitialDelay()).plus(healthCheck.getInterval())) + .expectNext(Lists.list()) + .expectNoEvent(healthCheck.getInterval()) + .expectNext(Lists.list(si1)) + .expectNoEvent(healthCheck.getInterval()) + .expectNext(Lists.list(si1)) + .thenCancel() + .verify(healthCheck.getInterval().multipliedBy(5)); + } + + @Test + void shouldUpdateInstances() { + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", + SERVICE_ID, "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", + SERVICE_ID, "127.0.0.2", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Flux> instances = Flux.just(Lists.list(si1)) + .concatWith(Flux.just(Lists.list(si1, si2)).delayElements(healthCheck.getInterval().dividedBy(2))); + Mockito.when(delegate.get()).thenReturn(instances); + + HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return Mono.just(true); + } + }; + + return listSupplier.healthCheckFlux(); + }) + .expectNext(Lists.list(si1)) + .expectNoEvent(healthCheck.getInterval()) + .expectNext(Lists.list(si1)) + .expectNext(Lists.list(si1, si2)) + .expectNoEvent(healthCheck.getInterval()) + .expectNext(Lists.list(si1)) + .expectNext(Lists.list(si1, si2)) + .thenCancel() + .verify(healthCheck.getInterval().multipliedBy(4)); + } + + @Test + void shouldCancelSubscription() { + + final AtomicInteger instancesCanceled = new AtomicInteger(); + + ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.get()).thenReturn(Flux.>never() + .doOnCancel(instancesCanceled::incrementAndGet)); + + HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + delegate, + healthCheck, webClient); + + Assertions.assertThat(instancesCanceled).hasValue(0); + + listSupplier.destroy(); + + Assertions.assertThat(instancesCanceled).hasValue(1); + } + @Configuration(proxyBeanMethods = false) @EnableAutoConfiguration @RestController From bceb9478760ae23c1d541b268b2b6d1ea64b4053 Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Sun, 9 Feb 2020 10:13:54 +0200 Subject: [PATCH 06/12] add timeout to tests --- .../HealthCheckServiceInstanceListSupplierTests.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java index 43d3da6e3..bd460a255 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java @@ -140,7 +140,7 @@ void shouldReturnOnlyAliveService() { .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) .thenCancel() - .verify(); + .verify(healthCheck.getInterval().multipliedBy(4)); } @Test @@ -167,7 +167,7 @@ void shouldEmitOnEachAliveServiceInBatch() { .expectNext(Lists.list(si1, si2)) .expectNoEvent(healthCheck.getInterval()) .thenCancel() - .verify(); + .verify(healthCheck.getInterval().multipliedBy(4)); } @Test @@ -192,7 +192,7 @@ void shouldNotFailIfIsAliveReturnsError() { .expectNext(Lists.list(si1)) .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()).plus(healthCheck.getInterval())) .thenCancel() - .verify(); + .verify(healthCheck.getInterval().multipliedBy(4)); } @Test @@ -217,7 +217,7 @@ void shouldEmitEmptyListIfAllIsAliveChecksFailed() { .expectNext(Lists.list()) .expectNoEvent(healthCheck.getInterval()) .thenCancel() - .verify(); + .verify(healthCheck.getInterval().multipliedBy(4)); } @Test @@ -245,7 +245,7 @@ void shouldRepeatIsAliveChecksIndefinitely() { .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) .thenCancel() - .verify(); + .verify(healthCheck.getInterval().multipliedBy(4)); } @Test From 6849d9c70b6a15623eab04e8779277df5ab77753 Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Sun, 9 Feb 2020 14:34:22 +0200 Subject: [PATCH 07/12] format --- .../core/HealthCheckServiceInstanceListSupplier.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index 3e8627ae3..567a3849a 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -125,8 +125,8 @@ protected Flux> healthCheckFlux() { }) .defaultIfEmpty(result); }) - .repeatWhen(restart -> restart.delayElements(healthCheck.getInterval())) - .delaySubscription(Duration.ofMillis(healthCheck.getInitialDelay())); + .repeatWhen(restart -> restart.delayElements(this.healthCheck.getInterval())) + .delaySubscription(Duration.ofMillis(this.healthCheck.getInitialDelay())); } @Override From a1fd01a5cc4a658ecd13852853fdc275aa1507bd Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Sun, 9 Feb 2020 15:17:52 +0200 Subject: [PATCH 08/12] dispose all created HealthCheckServiceInstanceListSupplier --- ...CheckServiceInstanceListSupplierTests.java | 84 ++++++++++++++----- 1 file changed, 61 insertions(+), 23 deletions(-) diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java index bd460a255..36c14389f 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java @@ -22,6 +22,8 @@ import org.assertj.core.api.Assertions; import org.assertj.core.util.Lists; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mockito; @@ -56,20 +58,36 @@ classes = HealthCheckServiceInstanceListSupplierTests.TestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class HealthCheckServiceInstanceListSupplierTests { + private static final String SERVICE_ID = "ignored-service"; + private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(5); @LocalServerPort private int port; private final WebClient webClient = WebClient.create(); - private LoadBalancerProperties.HealthCheck healthCheck = new LoadBalancerProperties.HealthCheck(); - private static final String SERVICE_ID = "ignored-service"; + private LoadBalancerProperties.HealthCheck healthCheck; + + private HealthCheckServiceInstanceListSupplier listSupplier; + + @BeforeEach + void setUp() { + healthCheck = new LoadBalancerProperties.HealthCheck(); + } + + @AfterEach + void tearDown() throws Exception { + if (listSupplier != null) { + listSupplier.destroy(); + listSupplier = null; + } + } @SuppressWarnings("ConstantConditions") @Test void shouldCheckInstanceWithProvidedHealthCheckPath() { healthCheck.getPath().put("ignored-service", "/health"); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( ServiceInstanceListSupplier.FixedServiceInstanceListSupplier .with(new MockEnvironment()).build(), healthCheck, webClient); @@ -84,7 +102,7 @@ void shouldCheckInstanceWithProvidedHealthCheckPath() { @SuppressWarnings("ConstantConditions") @Test void shouldCheckInstanceWithDefaultHealthCheckPath() { - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( ServiceInstanceListSupplier.FixedServiceInstanceListSupplier .with(new MockEnvironment()).build(), healthCheck, webClient); @@ -100,7 +118,7 @@ void shouldCheckInstanceWithDefaultHealthCheckPath() { @Test void shouldReturnFalseIfEndpointNotFound() { healthCheck.getPath().put("ignored-service", "/test"); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( ServiceInstanceListSupplier.FixedServiceInstanceListSupplier .with(new MockEnvironment()).build(), healthCheck, webClient); @@ -114,7 +132,6 @@ void shouldReturnFalseIfEndpointNotFound() { @Test void shouldReturnOnlyAliveService() { - LoadBalancerProperties.HealthCheck healthCheck = new LoadBalancerProperties.HealthCheck(); healthCheck.setInitialDelay(1000); ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", @@ -126,7 +143,7 @@ void shouldReturnOnlyAliveService() { ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( delegate, healthCheck, webClient); @@ -140,11 +157,12 @@ void shouldReturnOnlyAliveService() { .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) .thenCancel() - .verify(healthCheck.getInterval().multipliedBy(4)); + .verify(VERIFY_TIMEOUT); } @Test void shouldEmitOnEachAliveServiceInBatch() { + healthCheck.setInitialDelay(1000); ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", port, false); ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", @@ -154,7 +172,7 @@ void shouldEmitOnEachAliveServiceInBatch() { ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( delegate, healthCheck, webClient); @@ -163,15 +181,18 @@ void shouldEmitOnEachAliveServiceInBatch() { Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si2); return listSupplier.healthCheckFlux(); }) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list(si1)) .expectNext(Lists.list(si1, si2)) .expectNoEvent(healthCheck.getInterval()) .thenCancel() - .verify(healthCheck.getInterval().multipliedBy(4)); + .verify(VERIFY_TIMEOUT); } @Test void shouldNotFailIfIsAliveReturnsError() { + healthCheck.setInitialDelay(1000); ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", port, false); ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", @@ -181,22 +202,26 @@ void shouldNotFailIfIsAliveReturnsError() { ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( delegate, healthCheck, webClient); + listSupplier = Mockito.spy(listSupplier); Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si1); Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); return listSupplier.healthCheckFlux(); }) + .expectSubscription() + .thenAwait(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list(si1)) - .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()).plus(healthCheck.getInterval())) + .expectNoEvent(healthCheck.getInterval()) .thenCancel() - .verify(healthCheck.getInterval().multipliedBy(4)); + .verify(VERIFY_TIMEOUT); } @Test void shouldEmitEmptyListIfAllIsAliveChecksFailed() { + healthCheck.setInitialDelay(1000); ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", port, false); ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", @@ -206,22 +231,26 @@ void shouldEmitEmptyListIfAllIsAliveChecksFailed() { ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( delegate, healthCheck, webClient); + listSupplier = Mockito.spy(listSupplier); Mockito.doReturn(Mono.just(false)).when(listSupplier).isAlive(si1); Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); return listSupplier.healthCheckFlux(); }) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list()) .expectNoEvent(healthCheck.getInterval()) .thenCancel() - .verify(healthCheck.getInterval().multipliedBy(4)); + .verify(VERIFY_TIMEOUT); } @Test void shouldRepeatIsAliveChecksIndefinitely() { + healthCheck.setInitialDelay(1000); ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", port, false); ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", @@ -231,25 +260,29 @@ void shouldRepeatIsAliveChecksIndefinitely() { ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( delegate, healthCheck, webClient); + listSupplier = Mockito.spy(listSupplier); Mockito.doReturn(Mono.just(false), Mono.just(true)).when(listSupplier).isAlive(si1); Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); return listSupplier.healthCheckFlux(); }) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list()) .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) .thenCancel() - .verify(healthCheck.getInterval().multipliedBy(4)); + .verify(VERIFY_TIMEOUT); } @Test void shouldTimeoutIsAliveCheck() { + healthCheck.setInitialDelay(1000); ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", port, false); @@ -257,26 +290,29 @@ void shouldTimeoutIsAliveCheck() { ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1))); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( delegate, healthCheck, webClient); + listSupplier = Mockito.spy(listSupplier); Mockito.doReturn(Mono.never(), Mono.just(true)) .when(listSupplier).isAlive(si1); return listSupplier.healthCheckFlux(); }) - .thenAwait(Duration.ofMillis(healthCheck.getInitialDelay()).plus(healthCheck.getInterval())) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()).plus(healthCheck.getInterval())) .expectNext(Lists.list()) .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) .thenCancel() - .verify(healthCheck.getInterval().multipliedBy(5)); + .verify(VERIFY_TIMEOUT); } @Test void shouldUpdateInstances() { + healthCheck.setInitialDelay(1000); ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, "127.0.0.1", port, false); ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", @@ -289,7 +325,7 @@ void shouldUpdateInstances() { .concatWith(Flux.just(Lists.list(si1, si2)).delayElements(healthCheck.getInterval().dividedBy(2))); Mockito.when(delegate.get()).thenReturn(instances); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( delegate, healthCheck, webClient) { @Override @@ -300,6 +336,8 @@ protected Mono isAlive(ServiceInstance serviceInstance) { return listSupplier.healthCheckFlux(); }) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) @@ -308,7 +346,7 @@ protected Mono isAlive(ServiceInstance serviceInstance) { .expectNext(Lists.list(si1)) .expectNext(Lists.list(si1, si2)) .thenCancel() - .verify(healthCheck.getInterval().multipliedBy(4)); + .verify(VERIFY_TIMEOUT); } @Test @@ -320,7 +358,7 @@ void shouldCancelSubscription() { Mockito.when(delegate.get()).thenReturn(Flux.>never() .doOnCancel(instancesCanceled::incrementAndGet)); - HealthCheckServiceInstanceListSupplier listSupplier = new HealthCheckServiceInstanceListSupplier( + listSupplier = new HealthCheckServiceInstanceListSupplier( delegate, healthCheck, webClient); From b2305b289bc100b23c63ae4ae6cf921824738bed Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Mon, 10 Feb 2020 19:39:38 +0200 Subject: [PATCH 09/12] - implement InitializingBean - use Flux.replay() instead of caching in the field --- spring-cloud-loadbalancer/pom.xml | 5 + ...ealthCheckServiceInstanceListSupplier.java | 110 +++--- ...CheckServiceInstanceListSupplierTests.java | 328 ++++++++++++------ 3 files changed, 281 insertions(+), 162 deletions(-) diff --git a/spring-cloud-loadbalancer/pom.xml b/spring-cloud-loadbalancer/pom.xml index cc671baf8..a87a1e9d7 100644 --- a/spring-cloud-loadbalancer/pom.xml +++ b/spring-cloud-loadbalancer/pom.xml @@ -88,5 +88,10 @@ reactor-test test + + org.awaitility + awaitility + test + diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index 567a3849a..68b904aab 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -18,7 +18,6 @@ import java.time.Duration; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -29,6 +28,7 @@ import reactor.core.publisher.Mono; import org.springframework.beans.factory.DisposableBean; +import org.springframework.beans.factory.InitializingBean; import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.loadbalancer.reactive.LoadBalancerProperties; import org.springframework.http.HttpStatus; @@ -45,7 +45,7 @@ * @since 2.2.0 */ public class HealthCheckServiceInstanceListSupplier - implements ServiceInstanceListSupplier, DisposableBean { + implements ServiceInstanceListSupplier, InitializingBean, DisposableBean { private static final Log LOG = LogFactory .getLog(HealthCheckServiceInstanceListSupplier.class); @@ -58,12 +58,10 @@ public class HealthCheckServiceInstanceListSupplier private final String defaultHealthCheckPath; - private volatile List instances = Collections.emptyList(); - - private volatile List healthyInstances = Collections.emptyList(); - private Disposable healthCheckDisposable; + private Flux> aliveInstancesReplay; + public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) { this.delegate = delegate; @@ -71,50 +69,51 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega this.defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default", "/actuator/health"); this.webClient = webClient; - initInstances(); - + this.aliveInstancesReplay = delegate.get() + .delaySubscription(Duration.ofMillis(this.healthCheck.getInitialDelay())) + .switchMap(serviceInstances -> healthCheckFlux(serviceInstances) + .map(alive -> (List) new ArrayList<>(alive)) + ) + .replay(1) + .refCount(1) + .onBackpressureLatest(); } - private void initInstances() { - Flux> instancesFlux = this.delegate.get().doOnNext(delegateInstances -> { - this.instances = Collections.unmodifiableList(new ArrayList<>(delegateInstances)); - }); - - Flux> verifiedInstancesFlux = healthCheckFlux() - .doOnNext((verifiedInstances -> { - this.healthyInstances = verifiedInstances; - })); - - this.healthCheckDisposable = Flux.merge(instancesFlux.then(), verifiedInstancesFlux.then()) - .subscribe(); + @Override + public void afterPropertiesSet() { + Disposable healthCheckDisposable = this.healthCheckDisposable; + if (healthCheckDisposable != null) { + healthCheckDisposable.dispose(); + } + this.healthCheckDisposable = aliveInstancesReplay.subscribe(); } - protected Flux> healthCheckFlux() { + protected Flux> healthCheckFlux(List instances) { return Flux.defer(() -> { - List> checks = new ArrayList<>(this.instances.size()); - for (ServiceInstance instance : this.instances) { - Mono alive = isAlive(instance) - .onErrorResume(error -> { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "Exception occurred during health check of the instance for service %s: %s", - instance.getServiceId(), instance.getUri()), error); - } - return Mono.empty(); - }) - .timeout(this.healthCheck.getInterval(), Mono.defer(() -> { - if (LOG.isDebugEnabled()) { - LOG.debug(String.format( - "The instance for service %s: %s did not respond for %s during health check", - instance.getServiceId(), instance.getUri(), this.healthCheck.getInterval())); - } - return Mono.empty(); - })) - .handle((isHealthy, sink) -> { - if (isHealthy) { - sink.next(instance); - } - }); + List> checks = new ArrayList<>(instances.size()); + for (ServiceInstance instance : instances) { + Mono alive = isAlive(instance).onErrorResume(error -> { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "Exception occurred during health check of the instance for service %s: %s", + instance.getServiceId(), instance.getUri()), error); + } + return Mono.empty(); + }) + .timeout(this.healthCheck.getInterval(), Mono.defer(() -> { + if (LOG.isDebugEnabled()) { + LOG.debug(String.format( + "The instance for service %s: %s did not respond for %s during health check", + instance.getServiceId(), instance.getUri(), + this.healthCheck.getInterval())); + } + return Mono.empty(); + })) + .handle((isHealthy, sink) -> { + if (isHealthy) { + sink.next(instance); + } + }); checks.add(alive); } @@ -125,8 +124,7 @@ protected Flux> healthCheckFlux() { }) .defaultIfEmpty(result); }) - .repeatWhen(restart -> restart.delayElements(this.healthCheck.getInterval())) - .delaySubscription(Duration.ofMillis(this.healthCheck.getInitialDelay())); + .repeatWhen(restart -> restart.delayElements(this.healthCheck.getInterval())); } @Override @@ -136,18 +134,7 @@ public String getServiceId() { @Override public Flux> get() { - return Flux.defer(() -> { - List it = new ArrayList<>(this.healthyInstances); - if (it.isEmpty()) { - // If there are no healthy instances, it might be better to still retry on all of - // them - if (LOG.isWarnEnabled()) { - LOG.warn("No verified healthy instances were found, returning all listed instances."); - } - it = this.instances; - } - return Flux.just(it); - }); + return aliveInstancesReplay; } protected Mono isAlive(ServiceInstance serviceInstance) { @@ -166,7 +153,10 @@ protected Mono isAlive(ServiceInstance serviceInstance) { @Override public void destroy() { - this.healthCheckDisposable.dispose(); + Disposable healthCheckDisposable = this.healthCheckDisposable; + if (healthCheckDisposable != null) { + healthCheckDisposable.dispose(); + } } } diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java index 36c14389f..ef0007834 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java @@ -22,6 +22,7 @@ import org.assertj.core.api.Assertions; import org.assertj.core.util.Lists; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -58,7 +59,9 @@ classes = HealthCheckServiceInstanceListSupplierTests.TestApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) class HealthCheckServiceInstanceListSupplierTests { + private static final String SERVICE_ID = "ignored-service"; + private static final Duration VERIFY_TIMEOUT = Duration.ofSeconds(5); @LocalServerPort @@ -134,52 +137,67 @@ void shouldReturnFalseIfEndpointNotFound() { void shouldReturnOnlyAliveService() { healthCheck.setInitialDelay(1000); - ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", - SERVICE_ID, "127.0.0.1", port, false); - ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", - SERVICE_ID, "127.0.0.2", port, false); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, + "127.0.0.2", port, false); StepVerifier.withVirtualTime(() -> { - ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - listSupplier = new HealthCheckServiceInstanceListSupplier( - delegate, - healthCheck, webClient); - - listSupplier = Mockito.spy(listSupplier); - Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si1); - Mockito.doReturn(Mono.just(false)).when(listSupplier).isAlive(si2); - return listSupplier.healthCheckFlux(); + + HealthCheckServiceInstanceListSupplier mock = Mockito + .mock(HealthCheckServiceInstanceListSupplier.class); + Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1); + Mockito.doReturn(Mono.just(false)).when(mock).isAlive(si2); + + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return mock.isAlive(serviceInstance); + } + }; + + return listSupplier.get(); }) .expectSubscription() .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) - .thenCancel() - .verify(VERIFY_TIMEOUT); + .thenCancel().verify(VERIFY_TIMEOUT); } @Test void shouldEmitOnEachAliveServiceInBatch() { healthCheck.setInitialDelay(1000); - ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", - SERVICE_ID, "127.0.0.1", port, false); - ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", - SERVICE_ID, "127.0.0.2", port, false); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, + "127.0.0.2", port, false); StepVerifier.withVirtualTime(() -> { - ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - listSupplier = new HealthCheckServiceInstanceListSupplier( - delegate, - healthCheck, webClient); - - listSupplier = Mockito.spy(listSupplier); - Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si1); - Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si2); - return listSupplier.healthCheckFlux(); + + HealthCheckServiceInstanceListSupplier mock = Mockito + .mock(HealthCheckServiceInstanceListSupplier.class); + Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1); + Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si2); + + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return mock.isAlive(serviceInstance); + } + }; + + return listSupplier.get(); }) .expectSubscription() .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) @@ -193,26 +211,35 @@ void shouldEmitOnEachAliveServiceInBatch() { @Test void shouldNotFailIfIsAliveReturnsError() { healthCheck.setInitialDelay(1000); - ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", - SERVICE_ID, "127.0.0.1", port, false); - ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", - SERVICE_ID, "127.0.0.2", port, false); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, + "127.0.0.2", port, false); StepVerifier.withVirtualTime(() -> { - ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - listSupplier = new HealthCheckServiceInstanceListSupplier( - delegate, - healthCheck, webClient); - - listSupplier = Mockito.spy(listSupplier); - Mockito.doReturn(Mono.just(true)).when(listSupplier).isAlive(si1); - Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); - return listSupplier.healthCheckFlux(); + + HealthCheckServiceInstanceListSupplier mock = Mockito + .mock(HealthCheckServiceInstanceListSupplier.class); + Mockito.doReturn(Mono.just(true)).when(mock).isAlive(si1); + Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock) + .isAlive(si2); + + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return mock.isAlive(serviceInstance); + } + }; + + return listSupplier.get(); }) .expectSubscription() - .thenAwait(Duration.ofMillis(healthCheck.getInitialDelay())) + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) .thenCancel() @@ -220,25 +247,32 @@ void shouldNotFailIfIsAliveReturnsError() { } @Test - void shouldEmitEmptyListIfAllIsAliveChecksFailed() { + void shouldEmitAllInstancesIfAllIsAliveChecksFailed() { healthCheck.setInitialDelay(1000); - ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", - SERVICE_ID, "127.0.0.1", port, false); - ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", - SERVICE_ID, "127.0.0.2", port, false); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, + "127.0.0.2", port, false); StepVerifier.withVirtualTime(() -> { - ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - listSupplier = new HealthCheckServiceInstanceListSupplier( - delegate, - healthCheck, webClient); - - listSupplier = Mockito.spy(listSupplier); - Mockito.doReturn(Mono.just(false)).when(listSupplier).isAlive(si1); - Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); - return listSupplier.healthCheckFlux(); + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + if (serviceInstance == si1) { + return Mono.just(false); + } + else { + return Mono.error(new RuntimeException("boom")); + } + } + }; + + return listSupplier.get(); }) .expectSubscription() .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) @@ -248,26 +282,64 @@ void shouldEmitEmptyListIfAllIsAliveChecksFailed() { .verify(VERIFY_TIMEOUT); } + @Test + void shouldMakeInitialDaleyAfterPropertiesSet() { + healthCheck.setInitialDelay(1000); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1))); + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return Mono.just(true); + } + }; + + listSupplier.afterPropertiesSet(); + + return listSupplier.get(); + }) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) + .expectNext(Lists.list(si1)).expectNoEvent(healthCheck.getInterval()) + .thenCancel().verify(VERIFY_TIMEOUT); + } + @Test void shouldRepeatIsAliveChecksIndefinitely() { healthCheck.setInitialDelay(1000); - ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", - SERVICE_ID, "127.0.0.1", port, false); - ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", - SERVICE_ID, "127.0.0.2", port, false); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, + "127.0.0.2", port, false); StepVerifier.withVirtualTime(() -> { - ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1, si2))); - listSupplier = new HealthCheckServiceInstanceListSupplier( - delegate, - healthCheck, webClient); - - listSupplier = Mockito.spy(listSupplier); - Mockito.doReturn(Mono.just(false), Mono.just(true)).when(listSupplier).isAlive(si1); - Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(listSupplier).isAlive(si2); - return listSupplier.healthCheckFlux(); + + HealthCheckServiceInstanceListSupplier mock = Mockito + .mock(HealthCheckServiceInstanceListSupplier.class); + Mockito.doReturn(Mono.just(false), Mono.just(true)).when(mock).isAlive(si1); + Mockito.doReturn(Mono.error(new RuntimeException("boom"))).when(mock) + .isAlive(si2); + + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return mock.isAlive(serviceInstance); + } + }; + + return listSupplier.get(); }) .expectSubscription() .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) @@ -276,57 +348,64 @@ void shouldRepeatIsAliveChecksIndefinitely() { .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) - .thenCancel() - .verify(VERIFY_TIMEOUT); + .thenCancel().verify(VERIFY_TIMEOUT); } @Test void shouldTimeoutIsAliveCheck() { healthCheck.setInitialDelay(1000); - ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", - SERVICE_ID, "127.0.0.1", port, false); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); StepVerifier.withVirtualTime(() -> { - ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1))); - listSupplier = new HealthCheckServiceInstanceListSupplier( - delegate, - healthCheck, webClient); - - listSupplier = Mockito.spy(listSupplier); - Mockito.doReturn(Mono.never(), Mono.just(true)) - .when(listSupplier).isAlive(si1); - return listSupplier.healthCheckFlux(); + + HealthCheckServiceInstanceListSupplier mock = Mockito + .mock(HealthCheckServiceInstanceListSupplier.class); + Mockito.when(mock.isAlive(si1)).thenReturn(Mono.never(), Mono.just(true)); + + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return mock.isAlive(serviceInstance); + } + }; + + return listSupplier.get(); }) .expectSubscription() - .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay()).plus(healthCheck.getInterval())) + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) + .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list()) .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) .expectNext(Lists.list(si1)) - .thenCancel() - .verify(VERIFY_TIMEOUT); + .thenCancel().verify(VERIFY_TIMEOUT); } @Test void shouldUpdateInstances() { healthCheck.setInitialDelay(1000); - ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", - SERVICE_ID, "127.0.0.1", port, false); - ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", - SERVICE_ID, "127.0.0.2", port, false); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); + ServiceInstance si2 = new DefaultServiceInstance("ignored-service-2", SERVICE_ID, + "127.0.0.2", port, false); StepVerifier.withVirtualTime(() -> { - ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); Flux> instances = Flux.just(Lists.list(si1)) - .concatWith(Flux.just(Lists.list(si1, si2)).delayElements(healthCheck.getInterval().dividedBy(2))); + .concatWith(Flux.just(Lists.list(si1, si2)) + .delayElements(healthCheck.getInterval().dividedBy(2))); Mockito.when(delegate.get()).thenReturn(instances); - listSupplier = new HealthCheckServiceInstanceListSupplier( - delegate, + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, healthCheck, webClient) { @Override protected Mono isAlive(ServiceInstance serviceInstance) { @@ -334,12 +413,12 @@ protected Mono isAlive(ServiceInstance serviceInstance) { } }; - return listSupplier.healthCheckFlux(); + return listSupplier.get(); }) .expectSubscription() .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list(si1)) - .expectNoEvent(healthCheck.getInterval()) + .thenAwait(healthCheck.getInterval().dividedBy(2)) .expectNext(Lists.list(si1)) .expectNext(Lists.list(si1, si2)) .expectNoEvent(healthCheck.getInterval()) @@ -349,24 +428,69 @@ protected Mono isAlive(ServiceInstance serviceInstance) { .verify(VERIFY_TIMEOUT); } + @Test + void shouldCacheResultIfAfterPropertiesSetInvoked() { + healthCheck.setInitialDelay(1000); + ServiceInstance si1 = new DefaultServiceInstance("ignored-service-1", SERVICE_ID, + "127.0.0.1", port, false); + + AtomicInteger emitCounter = new AtomicInteger(); + + StepVerifier.withVirtualTime(() -> { + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.getServiceId()).thenReturn(SERVICE_ID); + Mockito.when(delegate.get()).thenReturn(Flux.just(Lists.list(si1))); + + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, + healthCheck, webClient) { + @Override + protected Mono isAlive(ServiceInstance serviceInstance) { + return Mono.just(true); + } + + @Override + protected Flux> healthCheckFlux(List instances) { + return super.healthCheckFlux(instances).doOnNext(it -> emitCounter.incrementAndGet()); + } + }; + + listSupplier.afterPropertiesSet(); + + return listSupplier.get().take(1).concatWith(listSupplier.get().take(1)); + }) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) + .expectNext(Lists.list(si1)) + .expectNext(Lists.list(si1)) + .thenCancel() + .verify(VERIFY_TIMEOUT); + + Assertions.assertThat(emitCounter).hasValue(1); + } + @Test void shouldCancelSubscription() { final AtomicInteger instancesCanceled = new AtomicInteger(); - ServiceInstanceListSupplier delegate = Mockito.mock(ServiceInstanceListSupplier.class); - Mockito.when(delegate.get()).thenReturn(Flux.>never() + ServiceInstanceListSupplier delegate = Mockito + .mock(ServiceInstanceListSupplier.class); + Mockito.when(delegate.get()) + .thenReturn(Flux.>never() + .log("test") .doOnCancel(instancesCanceled::incrementAndGet)); - listSupplier = new HealthCheckServiceInstanceListSupplier( - delegate, - healthCheck, webClient); + listSupplier = new HealthCheckServiceInstanceListSupplier(delegate, healthCheck, webClient); + + listSupplier.afterPropertiesSet(); Assertions.assertThat(instancesCanceled).hasValue(0); listSupplier.destroy(); - - Assertions.assertThat(instancesCanceled).hasValue(1); + Awaitility.await() + .pollDelay(Duration.ofMillis(100)).atMost(VERIFY_TIMEOUT).untilAsserted( + () -> Assertions.assertThat(instancesCanceled).hasValue(1)); } @Configuration(proxyBeanMethods = false) From 515c5b9d882e404e636ae3a9075f856213e662e6 Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Mon, 10 Feb 2020 23:46:09 +0200 Subject: [PATCH 10/12] remove unnecessary onBackpressureLatest --- .../core/HealthCheckServiceInstanceListSupplier.java | 7 +++---- .../HealthCheckServiceInstanceListSupplierTests.java | 9 ++++++--- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index 68b904aab..eb3e2c027 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -58,9 +58,9 @@ public class HealthCheckServiceInstanceListSupplier private final String defaultHealthCheckPath; - private Disposable healthCheckDisposable; + private final Flux> aliveInstancesReplay; - private Flux> aliveInstancesReplay; + private Disposable healthCheckDisposable; public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, LoadBalancerProperties.HealthCheck healthCheck, WebClient webClient) { @@ -75,8 +75,7 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega .map(alive -> (List) new ArrayList<>(alive)) ) .replay(1) - .refCount(1) - .onBackpressureLatest(); + .refCount(1); } @Override diff --git a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java index ef0007834..7d85ccb1f 100644 --- a/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java +++ b/spring-cloud-loadbalancer/src/test/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplierTests.java @@ -167,7 +167,8 @@ protected Mono isAlive(ServiceInstance serviceInstance) { .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) .expectNext(Lists.list(si1)) .expectNoEvent(healthCheck.getInterval()) - .thenCancel().verify(VERIFY_TIMEOUT); + .thenCancel() + .verify(VERIFY_TIMEOUT); } @Test @@ -307,8 +308,10 @@ protected Mono isAlive(ServiceInstance serviceInstance) { }) .expectSubscription() .expectNoEvent(Duration.ofMillis(healthCheck.getInitialDelay())) - .expectNext(Lists.list(si1)).expectNoEvent(healthCheck.getInterval()) - .thenCancel().verify(VERIFY_TIMEOUT); + .expectNext(Lists.list(si1)) + .expectNoEvent(healthCheck.getInterval()) + .thenCancel() + .verify(VERIFY_TIMEOUT); } @Test From bab10c2162c4241d5338ff14fa4172dbe38e45b4 Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Tue, 11 Feb 2020 21:34:35 +0200 Subject: [PATCH 11/12] - cache unmodifiable List - make lazy delegate call --- .../core/HealthCheckServiceInstanceListSupplier.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index eb3e2c027..dc1f5bda5 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -18,6 +18,7 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -69,10 +70,10 @@ public HealthCheckServiceInstanceListSupplier(ServiceInstanceListSupplier delega this.defaultHealthCheckPath = healthCheck.getPath().getOrDefault("default", "/actuator/health"); this.webClient = webClient; - this.aliveInstancesReplay = delegate.get() + this.aliveInstancesReplay = Flux.defer(delegate) .delaySubscription(Duration.ofMillis(this.healthCheck.getInitialDelay())) .switchMap(serviceInstances -> healthCheckFlux(serviceInstances) - .map(alive -> (List) new ArrayList<>(alive)) + .map(alive -> Collections.unmodifiableList(new ArrayList<>(alive))) ) .replay(1) .refCount(1); From 16bbe53942bd891f360ff2ed91f23b241046a56d Mon Sep 17 00:00:00 2001 From: Roman Matiushchenko Date: Tue, 3 Mar 2020 12:02:29 +0200 Subject: [PATCH 12/12] use ArrayList as emits are sequential so it is safe --- .../core/HealthCheckServiceInstanceListSupplier.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java index dc1f5bda5..86efcaae5 100644 --- a/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java +++ b/spring-cloud-loadbalancer/src/main/java/org/springframework/cloud/loadbalancer/core/HealthCheckServiceInstanceListSupplier.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -117,7 +116,7 @@ protected Flux> healthCheckFlux(List inst checks.add(alive); } - List result = new CopyOnWriteArrayList<>(); + List result = new ArrayList<>(); return Flux.merge(checks).map(alive -> { result.add(alive); return result;