diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ActiveClientTokenManager.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ActiveClientTokenManager.java index f9ff21755852..4174c9e1b10f 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ActiveClientTokenManager.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ActiveClientTokenManager.java @@ -8,17 +8,18 @@ import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.exception.AzureException; import com.azure.core.util.logging.ClientLogger; +import reactor.core.Disposable; +import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; +import reactor.core.publisher.ReplayProcessor; import java.time.Duration; import java.time.OffsetDateTime; import java.time.ZoneOffset; -import java.util.Timer; -import java.util.TimerTask; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** * Manages the re-authorization of the client to the token audience against the CBS node. @@ -30,27 +31,19 @@ public class ActiveClientTokenManager implements TokenManager { private final Mono cbsNode; private final String tokenAudience; private final String scopes; - private final Timer timer; - private final Flux authorizationResults; - private FluxSink sink; + private final ReplayProcessor authorizationResults = ReplayProcessor.create(1); + private final FluxSink authorizationResultsSink = + authorizationResults.sink(FluxSink.OverflowStrategy.BUFFER); + private final EmitterProcessor durationSource = EmitterProcessor.create(); + private final FluxSink durationSourceSink = durationSource.sink(); + private final AtomicReference lastRefreshInterval = new AtomicReference<>(Duration.ofMinutes(1)); - // last refresh interval in milliseconds. - private AtomicLong lastRefreshInterval = new AtomicLong(); + private volatile Disposable subscription; public ActiveClientTokenManager(Mono cbsNode, String tokenAudience, String scopes) { - this.timer = new Timer(tokenAudience + "-tokenManager"); this.cbsNode = cbsNode; this.tokenAudience = tokenAudience; this.scopes = scopes; - this.authorizationResults = Flux.create(sink -> { - if (hasDisposed.get()) { - sink.complete(); - } else { - this.sink = sink; - } - }); - - lastRefreshInterval.set(Duration.ofMinutes(1).getSeconds() * 1000); } /** @@ -82,15 +75,18 @@ public Mono authorize() { // We want to refresh the token when 90% of the time before expiry has elapsed. final long refreshSeconds = (long) Math.floor(between.getSeconds() * 0.9); + // This converts it to milliseconds final long refreshIntervalMS = refreshSeconds * 1000; - lastRefreshInterval.set(refreshIntervalMS); - // If this is the first time authorize is called, the task will not have been scheduled yet. if (!hasScheduled.getAndSet(true)) { - logger.info("Scheduling refresh token task."); - scheduleRefreshTokenTask(refreshIntervalMS); + logger.info("Scheduling refresh token task"); + + final Duration firstInterval = Duration.ofMillis(refreshIntervalMS); + lastRefreshInterval.set(firstInterval); + authorizationResultsSink.next(AmqpResponseCode.ACCEPTED); + subscription = scheduleRefreshTokenTask(firstInterval); } return refreshIntervalMS; @@ -99,52 +95,51 @@ public Mono authorize() { @Override public void close() { - if (!hasDisposed.getAndSet(true)) { - if (this.sink != null) { - this.sink.complete(); - } - - this.timer.cancel(); + if (hasDisposed.getAndSet(true)) { + return; } - } - private void scheduleRefreshTokenTask(Long refreshIntervalInMS) { - try { - timer.schedule(new RefreshAuthorizationToken(), refreshIntervalInMS); - } catch (IllegalStateException e) { - logger.warning("Unable to schedule RefreshAuthorizationToken task.", e); - hasScheduled.set(false); + authorizationResultsSink.complete(); + durationSourceSink.complete(); + + if (subscription != null) { + subscription.dispose(); } } - private class RefreshAuthorizationToken extends TimerTask { - @Override - public void run() { - logger.info("Refreshing authorization token."); - authorize().subscribe( - (Long refreshIntervalInMS) -> { - - if (hasDisposed.get()) { - logger.info("Token manager has been disposed of. Not rescheduling."); - return; - } - - logger.info("Authorization successful. Refreshing token in {} ms.", refreshIntervalInMS); - sink.next(AmqpResponseCode.ACCEPTED); - - scheduleRefreshTokenTask(refreshIntervalInMS); - }, error -> { - if ((error instanceof AmqpException) && ((AmqpException) error).isTransient()) { - logger.error("Error is transient. Rescheduling authorization task.", error); - scheduleRefreshTokenTask(lastRefreshInterval.get()); - } else { - logger.error("Error occurred while refreshing token that is not retriable. Not scheduling" - + " refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", error); - hasScheduled.set(false); - } - - sink.error(error); + private Disposable scheduleRefreshTokenTask(Duration initialRefresh) { + // EmitterProcessor can queue up an initial refresh interval before any subscribers are received. + durationSourceSink.next(initialRefresh); + + return Flux.switchOnNext(durationSource.map(Flux::interval)) + .flatMap(delay -> { + logger.info("Refreshing token."); + return authorize(); + }) + .onErrorContinue( + error -> (error instanceof AmqpException) && ((AmqpException) error).isTransient(), + (amqpException, interval) -> { + final Duration lastRefresh = lastRefreshInterval.get(); + + logger.error("Error is transient. Rescheduling authorization task at interval {} ms.", + lastRefresh.toMillis(), amqpException); + durationSourceSink.next(lastRefreshInterval.get()); + }) + .subscribe(interval -> { + logger.info("Authorization successful. Refreshing token in {} ms.", interval); + authorizationResultsSink.next(AmqpResponseCode.ACCEPTED); + + final Duration nextRefresh = Duration.ofMillis(interval); + lastRefreshInterval.set(nextRefresh); + durationSourceSink.next(Duration.ofMillis(interval)); + }, error -> { + logger.error("Error occurred while refreshing token that is not retriable. Not scheduling" + + " refresh task. Use ActiveClientTokenManager.authorize() to schedule task again.", error); + hasScheduled.set(false); + durationSourceSink.complete(); + authorizationResultsSink.error(error); + }, () -> { + logger.info("Completed refresh token task."); }); - } } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AzureTokenManagerProvider.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AzureTokenManagerProvider.java index b8540e388d86..ba930e30020a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AzureTokenManagerProvider.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/AzureTokenManagerProvider.java @@ -46,6 +46,8 @@ public AzureTokenManagerProvider(CbsAuthorizationType authorizationType, String public TokenManager getTokenManager(Mono cbsNodeMono, String resource) { final String scopes = getResourceString(resource); final String tokenAudience = String.format(Locale.US, TOKEN_AUDIENCE_FORMAT, fullyQualifiedNamespace, resource); + + logger.info("Creating new token manager for audience[{}], scopes[{}]", tokenAudience, scopes); return new ActiveClientTokenManager(cbsNodeMono, tokenAudience, scopes); } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ActiveClientTokenManagerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ActiveClientTokenManagerTest.java index 024d10714b9e..45657132b9a9 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ActiveClientTokenManagerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ActiveClientTokenManagerTest.java @@ -10,6 +10,7 @@ import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.exception.AzureException; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mock; @@ -25,7 +26,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -public class ActiveClientTokenManagerTest { +class ActiveClientTokenManagerTest { private static final String AUDIENCE = "an-audience-test"; private static final String SCOPES = "scopes-test"; private static final Duration TIMEOUT = Duration.ofSeconds(4); @@ -34,12 +35,12 @@ public class ActiveClientTokenManagerTest { private ClaimsBasedSecurityNode cbsNode; @BeforeEach - public void setup() { + void setup() { MockitoAnnotations.initMocks(this); } @AfterEach - public void teardown() { + void teardown() { Mockito.framework().clearInlineMocks(); cbsNode = null; } @@ -48,7 +49,7 @@ public void teardown() { * Verify that we can get successes and errors from CBS node. */ @Test - public void getAuthorizationResults() { + void getAuthorizationResults() { // Arrange final Mono cbsNodeMono = Mono.fromCallable(() -> cbsNode); when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(3)); @@ -60,8 +61,9 @@ public void getAuthorizationResults() { .then(() -> tokenManager.authorize().block(TIMEOUT)) .expectNext(AmqpResponseCode.ACCEPTED) .expectNext(AmqpResponseCode.ACCEPTED) - .then(tokenManager::close) - .verifyComplete(); + .then(() -> tokenManager.close()) + .expectComplete() + .verify(); } /** @@ -70,7 +72,7 @@ public void getAuthorizationResults() { */ @SuppressWarnings("unchecked") @Test - public void getAuthorizationResultsSuccessFailure() { + void getAuthorizationResultsSuccessFailure() { // Arrange final Mono cbsNodeMono = Mono.fromCallable(() -> cbsNode); final IllegalArgumentException error = new IllegalArgumentException("Some error"); @@ -83,6 +85,7 @@ public void getAuthorizationResultsSuccessFailure() { StepVerifier.create(tokenManager.getAuthorizationResults()) .then(() -> tokenManager.authorize().block(TIMEOUT)) .expectNext(AmqpResponseCode.ACCEPTED) + .expectNext(AmqpResponseCode.ACCEPTED) .expectError(IllegalArgumentException.class) .verifyThenAssertThat() .hasNotDroppedElements() @@ -95,7 +98,7 @@ public void getAuthorizationResultsSuccessFailure() { * Verify that we cannot authorize with CBS node when it has already been disposed of. */ @Test - public void cannotAuthorizeDisposedInstance() { + void cannotAuthorizeDisposedInstance() { // Arrange final Mono cbsNodeMono = Mono.fromCallable(() -> cbsNode); when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(2)); @@ -114,31 +117,64 @@ public void cannotAuthorizeDisposedInstance() { */ @SuppressWarnings("unchecked") @Test - public void getAuthorizationResultsRetriableError() { + void getAuthorizationResultsRetriableError() { // Arrange final Mono cbsNodeMono = Mono.fromCallable(() -> cbsNode); - final AmqpException error = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Timed out", + final AmqpException error = new AmqpException(false, AmqpErrorCondition.ARGUMENT_ERROR, + "Non-retryable argument error", new AmqpErrorContext("Test-context-namespace")); - when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(3), Mono.error(error), - getNextExpiration(5), getNextExpiration(10), - getNextExpiration(45)); + when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(5), Mono.error(error), + getNextExpiration(5)); // Act & Assert try (ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE, SCOPES)) { StepVerifier.create(tokenManager.getAuthorizationResults()) .then(() -> tokenManager.authorize().block(TIMEOUT)) - .expectError(AmqpException.class) - .verify(); - - StepVerifier.create(tokenManager.getAuthorizationResults()) .expectNext(AmqpResponseCode.ACCEPTED) - .expectNext(AmqpResponseCode.ACCEPTED) - .then(tokenManager::close) - .verifyComplete(); + .expectErrorSatisfies(exception -> { + Assertions.assertTrue(exception instanceof AmqpException); + + AmqpException amqpException = (AmqpException) exception; + Assertions.assertFalse(amqpException.isTransient()); + Assertions.assertEquals(error.getErrorCondition(), amqpException.getErrorCondition()); + }) + .verify(Duration.ofSeconds(30)); } } + + /** + * Verify that the ActiveClientTokenManager does not get more authorization tasks. + */ + @SuppressWarnings("unchecked") + @Test + void getAuthorizationResultsNonRetriableError() { + // Arrange + final Mono cbsNodeMono = Mono.fromCallable(() -> cbsNode); + final AmqpException error = new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, "Test CBS node error.", + new AmqpErrorContext("Test-context-namespace")); + + when(cbsNode.authorize(any(), any())).thenReturn(getNextExpiration(5), Mono.error(error), + getNextExpiration(5), getNextExpiration(10), + getNextExpiration(45)); + + // Act & Assert + final ActiveClientTokenManager tokenManager = new ActiveClientTokenManager(cbsNodeMono, AUDIENCE, SCOPES); + + StepVerifier.create(tokenManager.getAuthorizationResults()) + .then(() -> tokenManager.authorize().block(TIMEOUT)) + .expectNext(AmqpResponseCode.ACCEPTED) + .expectNext(AmqpResponseCode.ACCEPTED) + .then(() -> { + System.out.println("Closing"); + tokenManager.close(); + }) + .expectComplete() + .verify(Duration.ofSeconds(30)); + } + + private Mono getNextExpiration(long secondsToWait) { return Mono.fromCallable(() -> OffsetDateTime.now(ZoneOffset.UTC).plusSeconds(secondsToWait)); } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AzureTokenManagerProviderTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AzureTokenManagerProviderTest.java index 1c59251da4dc..3a150a63c234 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AzureTokenManagerProviderTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/AzureTokenManagerProviderTest.java @@ -26,34 +26,34 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.Mockito.when; -public class AzureTokenManagerProviderTest { +class AzureTokenManagerProviderTest { private static final String HOST_NAME = "foobar.windows.net"; @Mock private ClaimsBasedSecurityNode cbsNode; @BeforeEach - public void setup() { + void setup() { MockitoAnnotations.initMocks(this); } @AfterEach - public void teardown() { + void teardown() { Mockito.framework().clearInlineMocks(); } @Test - public void constructorNullType() { + void constructorNullType() { assertThrows(NullPointerException.class, () -> new AzureTokenManagerProvider(null, HOST_NAME, "something.")); } @Test - public void constructorNullHost() { + void constructorNullHost() { assertThrows(NullPointerException.class, () -> new AzureTokenManagerProvider(CbsAuthorizationType.JSON_WEB_TOKEN, null, "some-scope")); } @Test - public void constructorNullScope() { + void constructorNullScope() { assertThrows(NullPointerException.class, () -> new AzureTokenManagerProvider(CbsAuthorizationType.JSON_WEB_TOKEN, HOST_NAME, null)); } @@ -62,7 +62,7 @@ public void constructorNullScope() { */ @ParameterizedTest @EnumSource(CbsAuthorizationType.class) - public void getResourceString(CbsAuthorizationType authorizationType) { + void getResourceString(CbsAuthorizationType authorizationType) { // Arrange final String scope = "some-scope"; final AzureTokenManagerProvider provider = new AzureTokenManagerProvider(authorizationType, HOST_NAME, scope); @@ -90,7 +90,7 @@ public void getResourceString(CbsAuthorizationType authorizationType) { * is generated from it. */ @Test - public void getCorrectTokenManagerSasToken() { + void getCorrectTokenManagerSasToken() { // Arrange final String aadScope = "some-active-directory-scope"; final AzureTokenManagerProvider provider = new AzureTokenManagerProvider(CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, HOST_NAME, aadScope); @@ -115,7 +115,7 @@ public void getCorrectTokenManagerSasToken() { * Verifies that for JWT token credentials, the scope is the the one that we expect from Azure AAD scope. */ @Test - public void getCorrectTokenManagerJwt() { + void getCorrectTokenManagerJwt() { // Arrange final String aadScope = "some-active-directory-scope"; final AzureTokenManagerProvider provider = new AzureTokenManagerProvider(CbsAuthorizationType.JSON_WEB_TOKEN, HOST_NAME, aadScope); @@ -135,4 +135,28 @@ public void getCorrectTokenManagerJwt() { .expectComplete() .verify(Duration.ofSeconds(10)); } + + /** + * Verify that if the same tokenAudience and scopes are passed in, the same {@link TokenManager} instance is + * returned. + */ + @Test + void differentInstanceReturned() { + // Arrange + final String aadScope = "some-active-directory-scope"; + final AzureTokenManagerProvider provider = new AzureTokenManagerProvider(CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, HOST_NAME, aadScope); + final String entityPath = "event-hub-test-2/partition/2"; + final String entityPath2 = "event-hub-test-2/partition/2"; + final AccessToken token = new AccessToken("a-new-access-token", OffsetDateTime.now().plusMinutes(10)); + final String tokenAudience = String.format(Locale.US, TOKEN_AUDIENCE_FORMAT, HOST_NAME, entityPath); + + when(cbsNode.authorize(argThat(audience -> audience.equals(tokenAudience)), argThat(scope -> scope.equals(tokenAudience)))) + .thenReturn(Mono.just(token.getExpiresAt())); + + // Act + final TokenManager tokenManager = provider.getTokenManager(Mono.just(cbsNode), entityPath); + final TokenManager tokenManager2 = provider.getTokenManager(Mono.just(cbsNode), entityPath2); + + Assertions.assertNotSame(tokenManager, tokenManager2); + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java index 97e3a44a6c44..8c92b110a2c9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java @@ -23,6 +23,7 @@ import org.mockito.Mockito; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import reactor.test.StepVerifier; import java.io.Closeable; import java.io.IOException; @@ -67,6 +68,8 @@ public void setupTest(TestInfo testInfo) { properties = new ConnectionStringProperties(getConnectionString()); + StepVerifier.setDefaultTimeout(TIMEOUT); + beforeTest(); } @@ -75,6 +78,7 @@ public void setupTest(TestInfo testInfo) { @AfterEach public void teardownTest(TestInfo testInfo) { logger.info("[{}]: Performing test clean-up.", testInfo.getDisplayName()); + StepVerifier.resetDefaultTimeout(); afterTest(); // Tear down any inline mocks to avoid memory leaks. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java index 19ab8b864657..56ac5df702f5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java @@ -26,7 +26,6 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicReference; import static com.azure.core.amqp.AmqpMessageConstant.ENQUEUED_TIME_UTC_ANNOTATION_NAME; import static com.azure.core.amqp.AmqpMessageConstant.OFFSET_ANNOTATION_NAME; @@ -72,7 +71,6 @@ protected void afterTest() { @Test public void interoperableWithDirectProtonAmqpMessage() { // Arrange - final AtomicReference receivedEventData = new AtomicReference<>(); final String messageTrackingValue = UUID.randomUUID().toString(); final HashMap applicationProperties = new HashMap<>(); @@ -119,23 +117,9 @@ public void interoperableWithDirectProtonAmqpMessage() { .filter(event -> isMatchingEvent(event, messageTrackingValue)).take(1).map(PartitionEvent::getData)) .assertNext(event -> { validateAmqpProperties(message, expectedAnnotations, applicationProperties, event); - receivedEventData.set(event); }) .expectComplete() .verify(TIMEOUT); - - Assertions.assertNotNull(receivedEventData.get()); - - System.out.println("Sending another event we received."); - final EventPosition enqueuedTime2 = EventPosition.fromEnqueuedTime(Instant.now()); - producer.send(receivedEventData.get(), sendOptions).block(TIMEOUT); - -// .filter(event -> isMatchingEvent(event, messageTrackingValue)) - StepVerifier.create(consumer.receiveFromPartition(PARTITION_ID, enqueuedTime2) - .take(1).map(PartitionEvent::getData)) - .assertNext(event -> validateAmqpProperties(message, expectedAnnotations, applicationProperties, event)) - .expectComplete() - .verify(TIMEOUT); } private void validateAmqpProperties(Message message, Map messageAnnotations, diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxyReceiveTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxyReceiveTest.java index f6cc19fb49ba..23375ccff0ab 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxyReceiveTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxyReceiveTest.java @@ -101,6 +101,7 @@ public void testReceiverStartOfStreamFilters() { StepVerifier.create(consumer.receiveFromPartition(PARTITION_ID, EventPosition.fromEnqueuedTime(testData.getEnqueuedTime())) .take(NUMBER_OF_EVENTS)) .expectNextCount(NUMBER_OF_EVENTS) - .verifyComplete(); + .expectComplete() + .verify(TIMEOUT); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxySendTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxySendTest.java index 5d689acf78b6..7de2d3698e96 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxySendTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/ProxySendTest.java @@ -26,7 +26,7 @@ import java.util.UUID; public class ProxySendTest extends IntegrationTestBase { - private static final int PROXY_PORT = 8899; + private static final int PROXY_PORT = 8999; private static final String PARTITION_ID = "1"; private static final int NUMBER_OF_EVENTS = 25; @@ -97,13 +97,15 @@ public void sendEvents() { try { // Act StepVerifier.create(producer.send(events, options)) - .verifyComplete(); + .expectComplete() + .verify(TIMEOUT); // Assert StepVerifier.create(consumer.receiveFromPartition(PARTITION_ID, EventPosition.fromEnqueuedTime(sendTime)) .filter(x -> TestUtils.isMatchingEvent(x, messageId)).take(NUMBER_OF_EVENTS)) .expectNextCount(NUMBER_OF_EVENTS) - .verifyComplete(); + .expectComplete() + .verify(TIMEOUT); } finally { dispose(producer, consumer); }