From 834df4f10d0cdac28cae0c7d7d0e73838b39bfa4 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 6 Jan 2023 11:59:43 -0800 Subject: [PATCH 1/7] Update logic so that an error being reported when AbstractXdsClient.closed == true gets propagated to the subscriber. Additionally, add some cleanup. --- .../java/io/grpc/xds/AbstractXdsClient.java | 23 ++++++++++---- .../io/grpc/xds/XdsClientImplTestBase.java | 30 +++++++++++++++++++ 2 files changed, 48 insertions(+), 5 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index f6b82775f1e..bbb09c997bc 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -217,6 +217,11 @@ void readyHandler() { return; } + if (isInBackoff()) { + rpcRetryTimer.cancel(); + rpcRetryTimer = null; + } + timerLaunch.startSubscriberTimersIfNeeded(serverInfo); } @@ -319,17 +324,25 @@ final void handleRpcCompleted() { } private void handleRpcStreamClosed(Status error) { - checkArgument(!error.isOk(), "unexpected OK status"); - if (closed) { + if (shutdown) { + closed = true; + if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { + rpcRetryTimer.cancel(); + } return; } + + checkArgument(!error.isOk(), "unexpected OK status"); + String errorMsg = + closed + ? "ADS stream failed with status {0}: {1}. Cause: {2}" + : "ADS stream closed with status {0}: {1}. Cause: {2}"; logger.log( - XdsLogLevel.ERROR, - "ADS stream closed with status {0}: {1}. Cause: {2}", - error.getCode(), error.getDescription(), error.getCause()); + XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause()); closed = true; xdsResponseHandler.handleStreamClosed(error); cleanUp(); + if (responseReceived || retryBackoffPolicy == null) { // Reset the backoff sequence if had received a response, or backoff sequence // has never been initialized. diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 3ee6c916c2e..e83feea6188 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -107,6 +107,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.junit.After; +import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Rule; @@ -3592,6 +3593,35 @@ public void sendingToStoppedServer() throws Exception { } } + @Test + public void sendingToPermanentlyStoppedServer() throws Exception { + // Setup xdsClient to fail on stream creation + XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { + @Override + ManagedChannel create(ServerInfo serverInfo) { + throw new IllegalArgumentException("Can not create channel for " + serverInfo); + } + }; + xdsClient = + new XdsClientImpl( + xdsChannelFactory, + xdsClient.getBootstrapInfo(), + Context.ROOT, + fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, + fakeClock.getStopwatchSupplier(), + timeProvider, + tlsContextManager); + + try { + xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, + ldsResourceWatcher); + Assert.fail("Expected failure creating stream didn't happen"); + } catch (AssertionError e) { + assertThat(e).hasMessageThat().contains( + "Can not create channel for ServerInfo{target=trafficdirector.googleapis.com"); + } + } private DiscoveryRpcCall startResourceWatcher( XdsResourceType type, String name, ResourceWatcher watcher) { From 7865536b909642f98f71c917db482caf75f699f5 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 6 Jan 2023 15:04:29 -0800 Subject: [PATCH 2/7] Enhance test for catching problems with server url --- .../io/grpc/xds/XdsClientImplTestBase.java | 82 ++++++++++++++----- 1 file changed, 60 insertions(+), 22 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index e83feea6188..f01225607d9 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static io.grpc.xds.XdsClientImpl.XdsChannelFactory.DEFAULT_XDS_CHANNEL_FACTORY; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; @@ -70,6 +71,7 @@ import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcCleanupRule; import io.grpc.xds.Bootstrapper.AuthorityInfo; +import io.grpc.xds.Bootstrapper.BootstrapInfo; import io.grpc.xds.Bootstrapper.CertificateProviderInfo; import io.grpc.xds.Bootstrapper.ServerInfo; import io.grpc.xds.Endpoints.DropOverload; @@ -95,6 +97,7 @@ import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import java.io.IOException; +import java.net.UnknownHostException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collections; @@ -3594,33 +3597,68 @@ public void sendingToStoppedServer() throws Exception { } @Test - public void sendingToPermanentlyStoppedServer() throws Exception { + public void sendToBadUrl() throws Exception { // Setup xdsClient to fail on stream creation - XdsChannelFactory xdsChannelFactory = new XdsChannelFactory() { - @Override - ManagedChannel create(ServerInfo serverInfo) { - throw new IllegalArgumentException("Can not create channel for " + serverInfo); - } - }; - xdsClient = - new XdsClientImpl( - xdsChannelFactory, - xdsClient.getBootstrapInfo(), - Context.ROOT, - fakeClock.getScheduledExecutorService(), - backoffPolicyProvider, - fakeClock.getStopwatchSupplier(), - timeProvider, - tlsContextManager); + XdsClientImpl client = createXdsClient("some. garbage"); try { - xdsClient.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, - ldsResourceWatcher); - Assert.fail("Expected failure creating stream didn't happen"); + client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + fakeClock.forwardTime(20, TimeUnit.SECONDS); } catch (AssertionError e) { - assertThat(e).hasMessageThat().contains( - "Can not create channel for ServerInfo{target=trafficdirector.googleapis.com"); + assertThat(e.getCause()).isInstanceOf(IllegalArgumentException.class); + return; } + Assert.fail("Expected exception for bad url not thrown"); + verify(ldsResourceWatcher).onError(errorCaptor.capture()); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isNotEmpty(); + } + + @Test + public void sendToNonexistentHost() throws Exception { + // Setup xdsClient to fail on stream creation + XdsClientImpl client = createXdsClient("some.garbage"); + client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + fakeClock.forwardTime(20, TimeUnit.SECONDS); + + verify(ldsResourceWatcher).onError(errorCaptor.capture()); + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isNotEmpty(); + } + + private XdsClientImpl createXdsClient(String serverUri) { + BootstrapInfo bootstrapInfo = buildBootStrap(serverUri); + return new XdsClientImpl( + DEFAULT_XDS_CHANNEL_FACTORY, + bootstrapInfo, + Context.ROOT, + fakeClock.getScheduledExecutorService(), + backoffPolicyProvider, + fakeClock.getStopwatchSupplier(), + timeProvider, + tlsContextManager); + } + + private BootstrapInfo buildBootStrap(String serverUri) { + + ServerInfo xdsServerInfo = ServerInfo.create(serverUri, CHANNEL_CREDENTIALS, + ignoreResourceDeletion()); + + return Bootstrapper.BootstrapInfo.builder() + .servers(Collections.singletonList(xdsServerInfo)) + .node(NODE) + .authorities(ImmutableMap.of( + "authority.xds.com", + AuthorityInfo.create( + "xdstp://authority.xds.com/envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_CUSTOME_AUTHORITY, CHANNEL_CREDENTIALS))), + "", + AuthorityInfo.create( + "xdstp:///envoy.config.listener.v3.Listener/%s", + ImmutableList.of(Bootstrapper.ServerInfo.create( + SERVER_URI_EMPTY_AUTHORITY, CHANNEL_CREDENTIALS))))) + .certProviders(ImmutableMap.of("cert-instance-name", + CertificateProviderInfo.create("file-watcher", ImmutableMap.of()))) + .build(); } private DiscoveryRpcCall startResourceWatcher( From f62c91aeb42f70a925cd1face302e3f47f8e3551 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Fri, 6 Jan 2023 15:05:32 -0800 Subject: [PATCH 3/7] remove extra import --- xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index f01225607d9..8ad03a06757 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -97,7 +97,6 @@ import io.grpc.xds.XdsRouteConfigureResource.RdsUpdate; import io.grpc.xds.internal.security.CommonTlsContextTestsUtil; import java.io.IOException; -import java.net.UnknownHostException; import java.util.ArrayDeque; import java.util.Arrays; import java.util.Collections; From 9f77e0f6ad0b6a35c28c458892d6b74e42049eca Mon Sep 17 00:00:00 2001 From: larry-safran Date: Mon, 9 Jan 2023 17:00:28 -0800 Subject: [PATCH 4/7] Stop setting waitForReady in XdsClient's AbstractXdsClient. Fix test case to deal with asynchronous flow. --- .../java/io/grpc/xds/AbstractXdsClient.java | 2 +- .../java/io/grpc/xds/XdsClientImplTestBase.java | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index bbb09c997bc..cbf2ee94809 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -436,7 +436,7 @@ public void run() { }); } }; - requestWriter = stub.withWaitForReady().streamAggregatedResources(responseReader); + requestWriter = stub.streamAggregatedResources(responseReader); } @Override diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 8ad03a06757..307d6b9fc24 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -117,6 +117,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; import org.mockito.Captor; import org.mockito.InOrder; import org.mockito.Mock; @@ -3576,13 +3577,18 @@ public void sendingToStoppedServer() throws Exception { .build() .start()); fakeClock.forwardTime(5, TimeUnit.SECONDS); + verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE); + fakeClock.forwardTime(20, TimeUnit.SECONDS); // Trigger rpcRetryTimer DiscoveryRpcCall call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); + if (call == null) { // The first rpcRetry may have happened before the channel was ready + fakeClock.forwardTime(50, TimeUnit.SECONDS); + call = resourceDiscoveryCalls.poll(3, TimeUnit.SECONDS); + } // NOTE: There is a ScheduledExecutorService that may get involved due to the reconnect // so you cannot rely on the logic being single threaded. The timeout() in verifyRequest // is therefore necessary to avoid flakiness. // Send a response and do verifications - verify(ldsResourceWatcher, never()).onResourceDoesNotExist(LDS_RESOURCE); call.sendResponse(LDS, mf.buildWrappedResource(testListenerVhosts), VERSION_1, "0001"); call.verifyRequest(LDS, LDS_RESOURCE, VERSION_1, "0001", NODE); verify(ldsResourceWatcher).onChanged(ldsUpdateCaptor.capture()); @@ -3619,8 +3625,13 @@ public void sendToNonexistentHost() throws Exception { client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); fakeClock.forwardTime(20, TimeUnit.SECONDS); - verify(ldsResourceWatcher).onError(errorCaptor.capture()); - assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isNotEmpty(); + verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any()); + fakeClock.forwardTime(50, TimeUnit.SECONDS); // Trigger rpcRetry if appropriate + assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); + // Get rid of rpcRetryTask that should have been scheduled since still cannot talk to server + for (ScheduledTask task : fakeClock.getPendingTasks()) { + task.cancel(true); + } } private XdsClientImpl createXdsClient(String serverUri) { From b5783fc80db46632001fb8ec2e23416c688a1d64 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Wed, 18 Jan 2023 18:04:02 -0800 Subject: [PATCH 5/7] Address code review comment --- xds/src/main/java/io/grpc/xds/AbstractXdsClient.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index cbf2ee94809..29474327383 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -325,10 +325,6 @@ final void handleRpcCompleted() { private void handleRpcStreamClosed(Status error) { if (shutdown) { - closed = true; - if (rpcRetryTimer != null && rpcRetryTimer.isPending()) { - rpcRetryTimer.cancel(); - } return; } From ea01796065f81ce426289c670f52f5f2e997a2cd Mon Sep 17 00:00:00 2001 From: larry-safran Date: Thu, 19 Jan 2023 14:15:42 -0800 Subject: [PATCH 6/7] Cleanup unneeded code and shutdown test specific XdsClients. --- xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 307d6b9fc24..40352fab2ae 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -3611,11 +3611,10 @@ public void sendToBadUrl() throws Exception { fakeClock.forwardTime(20, TimeUnit.SECONDS); } catch (AssertionError e) { assertThat(e.getCause()).isInstanceOf(IllegalArgumentException.class); + client.shutdown(); return; } Assert.fail("Expected exception for bad url not thrown"); - verify(ldsResourceWatcher).onError(errorCaptor.capture()); - assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isNotEmpty(); } @Test @@ -3628,10 +3627,7 @@ public void sendToNonexistentHost() throws Exception { verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any()); fakeClock.forwardTime(50, TimeUnit.SECONDS); // Trigger rpcRetry if appropriate assertThat(fakeClock.getPendingTasks(LDS_RESOURCE_FETCH_TIMEOUT_TASK_FILTER)).isEmpty(); - // Get rid of rpcRetryTask that should have been scheduled since still cannot talk to server - for (ScheduledTask task : fakeClock.getPendingTasks()) { - task.cancel(true); - } + client.shutdown(); } private XdsClientImpl createXdsClient(String serverUri) { From 88c760da91ac8289b450552d4668b231780f3d60 Mon Sep 17 00:00:00 2001 From: larry-safran Date: Tue, 24 Jan 2023 17:31:23 -0800 Subject: [PATCH 7/7] Handle bad URL cleanly. Go back to using closed instead of shutdown for gating handleRpcStreamClosed --- .../java/io/grpc/xds/AbstractXdsClient.java | 14 +++++++------ .../main/java/io/grpc/xds/XdsClientImpl.java | 17 +++++++++++++--- .../io/grpc/xds/XdsClientImplTestBase.java | 20 ++++++++----------- 3 files changed, 30 insertions(+), 21 deletions(-) diff --git a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java index 29474327383..b5384616925 100644 --- a/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java +++ b/xds/src/main/java/io/grpc/xds/AbstractXdsClient.java @@ -61,6 +61,8 @@ * the xDS RPC stream. */ final class AbstractXdsClient { + + public static final String CLOSED_BY_SERVER = "Closed by server"; private final SynchronizationContext syncContext; private final InternalLogId logId; private final XdsLogger logger; @@ -320,19 +322,19 @@ final void handleRpcError(Throwable t) { } final void handleRpcCompleted() { - handleRpcStreamClosed(Status.UNAVAILABLE.withDescription("Closed by server")); + handleRpcStreamClosed(Status.UNAVAILABLE.withDescription(CLOSED_BY_SERVER)); } private void handleRpcStreamClosed(Status error) { - if (shutdown) { + if (closed) { return; } checkArgument(!error.isOk(), "unexpected OK status"); - String errorMsg = - closed - ? "ADS stream failed with status {0}: {1}. Cause: {2}" - : "ADS stream closed with status {0}: {1}. Cause: {2}"; + String errorMsg = error.getDescription() != null + && error.getDescription().equals(CLOSED_BY_SERVER) + ? "ADS stream closed with status {0}: {1}. Cause: {2}" + : "ADS stream failed with status {0}: {1}. Cause: {2}"; logger.log( XdsLogLevel.ERROR, errorMsg, error.getCode(), error.getDescription(), error.getCause()); closed = true; diff --git a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java index e7ed64aea87..e67bff12871 100644 --- a/xds/src/main/java/io/grpc/xds/XdsClientImpl.java +++ b/xds/src/main/java/io/grpc/xds/XdsClientImpl.java @@ -514,11 +514,22 @@ private final class ResourceSubscriber { // Initialize metadata in UNKNOWN state to cover the case when resource subscriber, // is created but not yet requested because the client is in backoff. this.metadata = ResourceMetadata.newResourceMetadataUnknown(); - maybeCreateXdsChannelWithLrs(serverInfo); - this.xdsChannel = serverChannelMap.get(serverInfo); - if (xdsChannel.isInBackoff()) { + + AbstractXdsClient xdsChannelTemp = null; + try { + maybeCreateXdsChannelWithLrs(serverInfo); + xdsChannelTemp = serverChannelMap.get(serverInfo); + if (xdsChannelTemp.isInBackoff()) { + return; + } + } catch (IllegalArgumentException e) { + xdsChannelTemp = null; + this.errorDescription = "Bad configuration: " + e.getMessage(); return; + } finally { + this.xdsChannel = xdsChannelTemp; } + restartTimer(); } diff --git a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java index 40352fab2ae..0f18d3d387f 100644 --- a/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java +++ b/xds/src/test/java/io/grpc/xds/XdsClientImplTestBase.java @@ -109,7 +109,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; import org.junit.After; -import org.junit.Assert; import org.junit.Assume; import org.junit.Before; import org.junit.Rule; @@ -3230,7 +3229,8 @@ public void streamClosedAndRetryWithBackoff() { // Management server closes the RPC stream with an error. call.sendError(Status.UNKNOWN.asException()); - verify(ldsResourceWatcher).onError(errorCaptor.capture()); + verify(ldsResourceWatcher, Mockito.timeout(1000).times(1)) + .onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); verify(rdsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNKNOWN, ""); @@ -3340,7 +3340,8 @@ public void streamClosedAndRetryRaceWithAddRemoveWatchers() { RDS_RESOURCE, rdsResourceWatcher); DiscoveryRpcCall call = resourceDiscoveryCalls.poll(); call.sendError(Status.UNAVAILABLE.asException()); - verify(ldsResourceWatcher).onError(errorCaptor.capture()); + verify(ldsResourceWatcher, Mockito.timeout(1000).times(1)) + .onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); verify(rdsResourceWatcher).onError(errorCaptor.capture()); verifyStatusWithNodeId(errorCaptor.getValue(), Code.UNAVAILABLE, ""); @@ -3606,15 +3607,10 @@ public void sendToBadUrl() throws Exception { // Setup xdsClient to fail on stream creation XdsClientImpl client = createXdsClient("some. garbage"); - try { - client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); - fakeClock.forwardTime(20, TimeUnit.SECONDS); - } catch (AssertionError e) { - assertThat(e.getCause()).isInstanceOf(IllegalArgumentException.class); - client.shutdown(); - return; - } - Assert.fail("Expected exception for bad url not thrown"); + client.watchXdsResource(XdsListenerResource.getInstance(), LDS_RESOURCE, ldsResourceWatcher); + fakeClock.forwardTime(20, TimeUnit.SECONDS); + verify(ldsResourceWatcher, Mockito.timeout(5000).times(1)).onError(ArgumentMatchers.any()); + client.shutdown(); } @Test