From 8e2de6d3cea79b4ef5ebaba7280d26a2e7324134 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 12:12:27 -0700 Subject: [PATCH 01/13] Removing public from LockRenewalOperation. --- .../messaging/servicebus/LockRenewalOperation.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java index 353c42286638..b644df9fb30f 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java @@ -23,7 +23,7 @@ /** * Represents a renewal session or message lock renewal operation that. */ -public class LockRenewalOperation implements AutoCloseable { +class LockRenewalOperation implements AutoCloseable { private final ClientLogger logger = new ClientLogger(LockRenewalOperation.class); private final AtomicBoolean isDisposed = new AtomicBoolean(); private final AtomicReference lockedUntil = new AtomicReference<>(); @@ -81,7 +81,7 @@ public class LockRenewalOperation implements AutoCloseable { * * @return the datetime the message or session is locked until. */ - public OffsetDateTime getLockedUntil() { + OffsetDateTime getLockedUntil() { return lockedUntil.get(); } @@ -90,7 +90,7 @@ public OffsetDateTime getLockedUntil() { * * @return The message lock token or {@code null} if a session is being renewed instead. */ - public String getLockToken() { + String getLockToken() { return isSession ? null : lockToken; } @@ -99,7 +99,7 @@ public String getLockToken() { * * @return The session id or {@code null} if it is not a session renewal. */ - public String getSessionId() { + String getSessionId() { return isSession ? lockToken : null; } @@ -108,7 +108,7 @@ public String getSessionId() { * * @return The current status of the renewal operation. */ - public LockRenewalStatus getStatus() { + LockRenewalStatus getStatus() { return status.get(); } @@ -117,7 +117,7 @@ public LockRenewalStatus getStatus() { * * @return the exception if an error occurred whilst renewing the message or session lock, otherwise {@code null}. */ - public Throwable getThrowable() { + Throwable getThrowable() { return throwable.get(); } From aad313f93b0b2c3997362d5d294b4a6f6c82b6a3 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 12:48:17 -0700 Subject: [PATCH 02/13] Update LockRenewalOperation to return a completion Mono. --- .../servicebus/LockRenewalOperation.java | 49 ++++++++++++------- .../ServiceBusReceiverAsyncClient.java | 9 ++-- 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java index b644df9fb30f..fb1a6c6accbe 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java @@ -6,7 +6,6 @@ import com.azure.messaging.servicebus.implementation.MessageUtils; import com.azure.messaging.servicebus.models.LockRenewalStatus; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -30,6 +29,7 @@ class LockRenewalOperation implements AutoCloseable { private final AtomicReference throwable = new AtomicReference<>(); private final AtomicReference status = new AtomicReference<>(LockRenewalStatus.RUNNING); private final MonoProcessor cancellationProcessor = MonoProcessor.create(); + private final Mono completionMono; private final String lockToken; private final boolean isSession; @@ -73,7 +73,31 @@ class LockRenewalOperation implements AutoCloseable { } this.lockedUntil.set(lockedUntil); - this.subscription = getRenewLockOperation(lockedUntil, maxLockRenewalDuration); + final Flux renewLockOperation = getRenewLockOperation(lockedUntil, maxLockRenewalDuration); + this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until), + error -> { + logger.error("token[{}]. Error occurred while renewing lock token.", error); + status.set(LockRenewalStatus.FAILED); + throwable.set(error); + cancellationProcessor.onComplete(); + }, () -> { + if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) { + logger.verbose("token[{}]. Renewing session lock task completed.", lockToken); + } + + cancellationProcessor.onComplete(); + }); + + this.completionMono = renewLockOperation.then(); + } + + /** + * Gets a mono that completes when the operation does. + * + * @return A mono that completes when the renewal operation does. + */ + Mono getCompletionOperation() { + return completionMono; } /** @@ -146,10 +170,11 @@ public void close() { * @param maxLockRenewalDuration Duration to renew lock for. * @return The subscription for the operation. */ - private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Duration maxLockRenewalDuration) { + private Flux getRenewLockOperation(OffsetDateTime initialLockedUntil, + Duration maxLockRenewalDuration) { if (maxLockRenewalDuration.isZero()) { status.set(LockRenewalStatus.COMPLETE); - return Disposables.single(); + return Flux.empty(); } final OffsetDateTime now = OffsetDateTime.now(); @@ -174,7 +199,6 @@ private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Dura sink.next(initialInterval); final Flux cancellationSignals = Flux.first(cancellationProcessor, Mono.delay(maxLockRenewalDuration)); - return Flux.switchOnNext(emitterProcessor.map(interval -> Mono.delay(interval) .thenReturn(Flux.create(s -> s.next(interval))))) .takeUntilOther(cancellationSignals) @@ -189,19 +213,6 @@ private Disposable getRenewLockOperation(OffsetDateTime initialLockedUntil, Dura sink.next(MessageUtils.adjustServerTimeout(next)); return offsetDateTime; - }) - .subscribe(until -> lockedUntil.set(until), - error -> { - logger.error("token[{}]. Error occurred while renewing lock token.", error); - status.set(LockRenewalStatus.FAILED); - throwable.set(error); - cancellationProcessor.onComplete(); - }, () -> { - if (status.compareAndSet(LockRenewalStatus.RUNNING, LockRenewalStatus.COMPLETE)) { - logger.verbose("token[{}]. Renewing session lock task completed.", lockToken); - } - - cancellationProcessor.onComplete(); - }); + }); } } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java index 8ae330e745d4..510b877f43f5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClient.java @@ -449,7 +449,7 @@ public Mono deadLetter(ServiceBusReceivedMessage message, DeadLetterOption * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed. */ - public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) { + public Mono getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) { if (isDisposed.get()) { throw logger.logExceptionAsError(new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewMessageLock"))); @@ -470,7 +470,8 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m final LockRenewalOperation operation = new LockRenewalOperation(lockToken, maxLockRenewalDuration, false, this::renewMessageLock); renewalContainer.addOrUpdate(lockToken, Instant.now().plus(maxLockRenewalDuration), operation); - return operation; + + return operation.getCompletionOperation(); } /** @@ -484,7 +485,7 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ - public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) { + public Mono getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) { if (isDisposed.get()) { throw logger.logExceptionAsError(new IllegalStateException( String.format(INVALID_OPERATION_DISPOSED_RECEIVER, "getAutoRenewSessionLock"))); @@ -506,7 +507,7 @@ public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration m this::renewSessionLock); renewalContainer.addOrUpdate(sessionId, Instant.now().plus(maxLockRenewalDuration), operation); - return operation; + return operation.getCompletionOperation(); } /** From d08cef73dfe5492f4a71f03e50f67ddcac056d39 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 13:20:13 -0700 Subject: [PATCH 03/13] Updating lockrenewal operation. --- .../messaging/servicebus/LockRenewalOperation.java | 4 ++-- .../messaging/servicebus/ServiceBusReceiverClient.java | 10 +++++++--- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java index fb1a6c6accbe..377708036306 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java @@ -73,7 +73,9 @@ class LockRenewalOperation implements AutoCloseable { } this.lockedUntil.set(lockedUntil); + final Flux renewLockOperation = getRenewLockOperation(lockedUntil, maxLockRenewalDuration); + this.completionMono = renewLockOperation.then(); this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until), error -> { logger.error("token[{}]. Error occurred while renewing lock token.", error); @@ -87,8 +89,6 @@ class LockRenewalOperation implements AutoCloseable { cancellationProcessor.onComplete(); }); - - this.completionMono = renewLockOperation.then(); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index f0dc9c1ad7d9..1c3ba666bf6b 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -18,6 +18,7 @@ import java.util.Objects; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; /** * A synchronous receiver responsible for receiving {@link ServiceBusReceivedMessage} from a specific queue or @@ -259,8 +260,11 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions dead * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed. */ - public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration) { - return asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration); + public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration, Consumer onError) { + final Consumer throwableConsumer = onError != null ? onError : e -> { + + }; + asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration); } /** @@ -273,7 +277,7 @@ public LockRenewalOperation getAutoRenewMessageLock(String lockToken, Duration m * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ - public LockRenewalOperation getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration) { + public void getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration, Consumer onError) { return asyncClient.getAutoRenewSessionLock(sessionId, maxLockRenewalDuration); } From c83db7ed672bdfa3ffec7beecc458348d966446f Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 15:12:45 -0700 Subject: [PATCH 04/13] Fixing tests. --- .../servicebus/ServiceBusReceiverClient.java | 19 +++++--- ...BusReceiverAsyncClientIntegrationTest.java | 44 +++++++------------ .../ServiceBusReceiverAsyncClientTest.java | 6 ++- 3 files changed, 34 insertions(+), 35 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 1c3ba666bf6b..2c9505c98444 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -255,16 +255,18 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions dead * * @param lockToken Lock token of the message. * @param maxLockRenewalDuration Maximum duration to keep renewing the lock token. - * @return A lock renewal operation for the message. * @throws NullPointerException if {@code lockToken} or {@code maxLockRenewalDuration} is null. * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed. */ public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration, Consumer onError) { - final Consumer throwableConsumer = onError != null ? onError : e -> { + final Consumer throwableConsumer = onError != null + ? onError : + error -> logger.warning("Exception occurred while renewing lock token: '{}'.", lockToken, error); - }; - asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration); + asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration).subscribe( + v -> logger.verbose("Completed renewing lock token: '{}'", lockToken), + throwableConsumer); } /** @@ -272,13 +274,18 @@ public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDur * * @param sessionId Id for the session to renew. * @param maxLockRenewalDuration Maximum duration to keep renewing the lock token. - * @return A lock renewal operation for the message. * @throws NullPointerException if {@code sessionId} or {@code maxLockRenewalDuration} is null. * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ public void getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration, Consumer onError) { - return asyncClient.getAutoRenewSessionLock(sessionId, maxLockRenewalDuration); + final Consumer throwableConsumer = onError != null + ? onError : + error -> logger.warning("Exception occurred while renewing session: '{}'.", sessionId, error); + + asyncClient.getAutoRenewSessionLock(sessionId, maxLockRenewalDuration).subscribe( + v -> logger.verbose("Completed renewing session: '{}'", sessionId), + throwableConsumer); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index dbf904550cda..b433106b7d46 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -930,37 +930,27 @@ void renewMessageLock(MessagingEntityType entityType) throws InterruptedExceptio final String messageId = UUID.randomUUID().toString(); final ServiceBusMessage message = getMessage(messageId, isSessionEnabled); - sendMessage(message).block(TIMEOUT); - - // Assert & Act - StepVerifier.create(receiver.receiveMessages()) - .assertNext(receivedContext -> { - final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); - assertNotNull(receivedMessage); - - final OffsetDateTime lockedUntil = receivedMessage.getLockedUntil(); - assertNotNull(lockedUntil); + final ServiceBusReceivedMessageContext receivedContext = sendMessage(message) + .then(receiver.receiveMessages().next()) + .block(TIMEOUT); + assertNotNull(receivedContext); - final LockRenewalOperation operation = receiver.getAutoRenewMessageLock( - receivedMessage.getLockToken(), maximumDuration); + final ServiceBusReceivedMessage receivedMessage = receivedContext.getMessage(); + assertNotNull(receivedMessage); - assertEquals(LockRenewalStatus.RUNNING, operation.getStatus()); - try { - Thread.sleep(sleepDuration.toMillis()); + final OffsetDateTime lockedUntil = receivedMessage.getLockedUntil(); + assertNotNull(lockedUntil); - assertTrue(lockedUntil.isBefore(operation.getLockedUntil())); - assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus()); - } catch (InterruptedException e) { - logger.error("Could not sleep.", e); + // Assert & Act + StepVerifier.create(receiver.getAutoRenewMessageLock(receivedMessage.getLockToken(), maximumDuration)) + .thenAwait(sleepDuration) + .then(() -> { + logger.info("Completing message."); + int numberCompleted = completeMessages(receiver, Collections.singletonList(receivedMessage)); - operation.close(); - assertEquals(LockRenewalStatus.CANCELLED, operation.getStatus()); - } finally { - int numberCompleted = completeMessages(receiver, - Collections.singletonList(receivedMessage)); - messagesPending.addAndGet(-numberCompleted); - } - }).thenCancel() + messagesPending.addAndGet(-numberCompleted); + }) + .expectComplete() .verify(Duration.ofMinutes(3)); } diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 788f6507ab03..4d45ae109540 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Consumer; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -103,7 +104,6 @@ class ServiceBusReceiverAsyncClientTest { private ServiceBusConnectionProcessor connectionProcessor; private ServiceBusReceiverAsyncClient receiver; private ServiceBusReceiverAsyncClient sessionReceiver; - private Duration maxAutoLockRenewalDuration; @Mock private ServiceBusReactorReceiver amqpReceiveLink; @@ -127,6 +127,8 @@ class ServiceBusReceiverAsyncClientTest { private Runnable onClientClose; @Mock private Function> renewalOperation; + @Mock + private Consumer onErrorConsumer; @BeforeAll static void beforeAll() { @@ -752,7 +754,7 @@ void autoRenewMessageLock() throws InterruptedException { .thenReturn(Mono.fromCallable(() -> Instant.now().plus(renewalPeriod))); // Act & Assert - final LockRenewalOperation operation = receiver.getAutoRenewMessageLock(lockToken, maxDuration); + receiver.getAutoRenewMessageLock(lockToken, maxDuration, ); Thread.sleep(totalSleepPeriod.toMillis()); logger.info("Finished renewals for first sleep."); From ffb219e0d2bb4aa0c153aea9bcf3a89663c394a8 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 16:38:53 -0700 Subject: [PATCH 05/13] Fix conflicting names in LockRenewalOperation. --- .../messaging/servicebus/LockRenewalOperation.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java index 377708036306..3d94987df124 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java @@ -53,18 +53,18 @@ class LockRenewalOperation implements AutoCloseable { * Creates a new lock renewal operation. * * @param lockToken Lock or session id to renew. - * @param lockedUntil The initial period the message or session is locked until. + * @param tokenLockedUntil The initial period the message or session is locked until. * @param maxLockRenewalDuration The maximum duration this lock should be renewed. * @param isSession Whether the lock represents a session lock or message lock. * @param renewalOperation The renewal operation to call. */ LockRenewalOperation(String lockToken, Duration maxLockRenewalDuration, boolean isSession, - Function> renewalOperation, OffsetDateTime lockedUntil) { + Function> renewalOperation, OffsetDateTime tokenLockedUntil) { this.lockToken = Objects.requireNonNull(lockToken, "'lockToken' cannot be null."); this.renewalOperation = Objects.requireNonNull(renewalOperation, "'renewalOperation' cannot be null."); this.isSession = isSession; - Objects.requireNonNull(lockedUntil, "'lockedUntil cannot be null.'"); + Objects.requireNonNull(tokenLockedUntil, "'lockedUntil cannot be null.'"); Objects.requireNonNull(maxLockRenewalDuration, "'maxLockRenewalDuration' cannot be null."); if (maxLockRenewalDuration.isNegative()) { @@ -72,9 +72,9 @@ class LockRenewalOperation implements AutoCloseable { "'maxLockRenewalDuration' cannot be negative.")); } - this.lockedUntil.set(lockedUntil); + this.lockedUntil.set(tokenLockedUntil); - final Flux renewLockOperation = getRenewLockOperation(lockedUntil, maxLockRenewalDuration); + final Flux renewLockOperation = getRenewLockOperation(tokenLockedUntil, maxLockRenewalDuration); this.completionMono = renewLockOperation.then(); this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until), error -> { From 0c6eabfb4898fc9e3d6912a1f31038f893f35a79 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 16:39:47 -0700 Subject: [PATCH 06/13] Fix test breaks in ServiceBusReceiverAsyncClientTest --- .../ServiceBusReceiverAsyncClientTest.java | 47 ++++++------------- 1 file changed, 14 insertions(+), 33 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 4d45ae109540..5771ef63ffea 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -22,7 +22,6 @@ import com.azure.messaging.servicebus.implementation.ServiceBusConnectionProcessor; import com.azure.messaging.servicebus.implementation.ServiceBusManagementNode; import com.azure.messaging.servicebus.implementation.ServiceBusReactorReceiver; -import com.azure.messaging.servicebus.models.LockRenewalStatus; import com.azure.messaging.servicebus.models.ReceiveMode; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transport.DeliveryState.DeliveryStateType; @@ -58,16 +57,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.function.Consumer; -import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import static com.azure.messaging.servicebus.TestUtils.getMessage; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -125,10 +119,6 @@ class ServiceBusReceiverAsyncClientTest { private ServiceBusReceivedMessage receivedMessage2; @Mock private Runnable onClientClose; - @Mock - private Function> renewalOperation; - @Mock - private Consumer onErrorConsumer; @BeforeAll static void beforeAll() { @@ -248,7 +238,7 @@ void peekWithSequenceOneMessage() { void receivesNumberOfEvents() { // Arrange final int numberOfEvents = 1; - final List messages = getMessages(10); + final List messages = getMessages(); ServiceBusReceivedMessage receivedMessage = mock(ServiceBusReceivedMessage.class); when(receivedMessage.getLockToken()).thenReturn(UUID.randomUUID().toString()); @@ -739,7 +729,7 @@ void cannotRenewMessageLockInSession() { * Verifies that we can auto-renew a message lock. */ @Test - void autoRenewMessageLock() throws InterruptedException { + void autoRenewMessageLock() { // Arrange final Duration maxDuration = Duration.ofSeconds(8); final Duration renewalPeriod = Duration.ofSeconds(3); @@ -754,15 +744,11 @@ void autoRenewMessageLock() throws InterruptedException { .thenReturn(Mono.fromCallable(() -> Instant.now().plus(renewalPeriod))); // Act & Assert - receiver.getAutoRenewMessageLock(lockToken, maxDuration, ); - Thread.sleep(totalSleepPeriod.toMillis()); - logger.info("Finished renewals for first sleep."); - - // Assert - assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus()); - assertNull(operation.getThrowable()); - assertTrue(startTime.isBefore(operation.getLockedUntil()), String.format( - "initial lockedUntil[%s] is not before lockedUntil[%s]", startTime, operation.getLockedUntil())); + StepVerifier.create(receiver.getAutoRenewMessageLock(lockToken, maxDuration)) + .thenAwait(totalSleepPeriod) + .then(() -> logger.info("Finished renewals for first sleep.")) + .expectComplete() + .verify(Duration.ofSeconds(5)); verify(managementNode, Mockito.atMost(atMost)).renewMessageLock(lockToken, null); } @@ -772,7 +758,7 @@ void autoRenewMessageLock() throws InterruptedException { * Verifies that we can auto-renew a message lock. */ @Test - void autoRenewSessionLock() throws InterruptedException { + void autoRenewSessionLock() { // Arrange final Duration maxDuration = Duration.ofSeconds(8); final Duration renewalPeriod = Duration.ofSeconds(3); @@ -787,23 +773,18 @@ void autoRenewSessionLock() throws InterruptedException { .thenReturn(Mono.fromCallable(() -> Instant.now().plus(renewalPeriod))); // Act & Assert - final LockRenewalOperation operation = sessionReceiver.getAutoRenewSessionLock(sessionId, maxDuration); - Thread.sleep(totalSleepPeriod.toMillis()); - logger.info("Finished renewals for first sleep."); - - // Assert - assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus()); - assertNull(operation.getThrowable()); - assertTrue(startTime.isBefore(operation.getLockedUntil()), String.format( - "initial lockedUntil[%s] is not before lockedUntil[%s]", startTime, operation.getLockedUntil())); + StepVerifier.create(sessionReceiver.getAutoRenewSessionLock(sessionId, maxDuration)) + .thenAwait(totalSleepPeriod) + .then(() -> logger.info("Finished renewals for first sleep.")) + .expectComplete().verify(Duration.ofSeconds(5)); verify(managementNode, Mockito.atMost(atMost)).renewSessionLock(sessionId, null); } - private List getMessages(int numberOfEvents) { + private List getMessages() { final Map map = Collections.singletonMap("SAMPLE_HEADER", "foo"); - return IntStream.range(0, numberOfEvents) + return IntStream.range(0, 10) .mapToObj(index -> getMessage(PAYLOAD_BYTES, messageTrackingUUID, map)) .collect(Collectors.toList()); } From 18467b455abd60b3a32338a0c8bc9711d7ee140e Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 16:48:05 -0700 Subject: [PATCH 07/13] Caching response from Flux so it is not reinvoked. --- .../com/azure/messaging/servicebus/LockRenewalOperation.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java index 3d94987df124..353595af9c63 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java @@ -74,7 +74,9 @@ class LockRenewalOperation implements AutoCloseable { this.lockedUntil.set(tokenLockedUntil); - final Flux renewLockOperation = getRenewLockOperation(tokenLockedUntil, maxLockRenewalDuration); + final Flux renewLockOperation = getRenewLockOperation(tokenLockedUntil, + maxLockRenewalDuration).cache(Duration.ofMinutes(2)); + this.completionMono = renewLockOperation.then(); this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until), error -> { From 9da915cf0a1ec3ee7f15b461f4e5e320d2bf93d8 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 16:49:02 -0700 Subject: [PATCH 08/13] Removing unused variables in test. --- .../servicebus/ServiceBusReceiverAsyncClientTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java index 5771ef63ffea..a0e6f6d2ef81 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientTest.java @@ -734,7 +734,6 @@ void autoRenewMessageLock() { final Duration maxDuration = Duration.ofSeconds(8); final Duration renewalPeriod = Duration.ofSeconds(3); final String lockToken = "some-token"; - final OffsetDateTime startTime = OffsetDateTime.now(); // At most 4 times because we renew the lock before it expires (by some seconds). final int atMost = 5; @@ -753,9 +752,8 @@ void autoRenewMessageLock() { verify(managementNode, Mockito.atMost(atMost)).renewMessageLock(lockToken, null); } - /** - * Verifies that we can auto-renew a message lock. + * Verifies that we can auto-renew a session lock. */ @Test void autoRenewSessionLock() { @@ -763,7 +761,6 @@ void autoRenewSessionLock() { final Duration maxDuration = Duration.ofSeconds(8); final Duration renewalPeriod = Duration.ofSeconds(3); final String sessionId = "some-token"; - final OffsetDateTime startTime = OffsetDateTime.now(); // At most 4 times because we renew the lock before it expires (by some seconds). final int atMost = 5; @@ -776,7 +773,8 @@ void autoRenewSessionLock() { StepVerifier.create(sessionReceiver.getAutoRenewSessionLock(sessionId, maxDuration)) .thenAwait(totalSleepPeriod) .then(() -> logger.info("Finished renewals for first sleep.")) - .expectComplete().verify(Duration.ofSeconds(5)); + .expectComplete() + .verify(Duration.ofSeconds(5)); verify(managementNode, Mockito.atMost(atMost)).renewSessionLock(sessionId, null); } From a246917179b9d2d35e6b891f1ff85c53429bd0bd Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 17:02:02 -0700 Subject: [PATCH 09/13] Fixing tests. --- .../servicebus/LockRenewalOperation.java | 4 +- .../servicebus/LockRenewalOperationTest.java | 37 +++++++++++-------- 2 files changed, 25 insertions(+), 16 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java index 353595af9c63..197cffc1c6bf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java @@ -75,7 +75,9 @@ class LockRenewalOperation implements AutoCloseable { this.lockedUntil.set(tokenLockedUntil); final Flux renewLockOperation = getRenewLockOperation(tokenLockedUntil, - maxLockRenewalDuration).cache(Duration.ofMinutes(2)); + maxLockRenewalDuration) + .takeUntilOther(cancellationProcessor) + .cache(Duration.ofMinutes(2)); this.completionMono = renewLockOperation.then(); this.subscription = renewLockOperation.subscribe(until -> this.lockedUntil.set(until), diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/LockRenewalOperationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/LockRenewalOperationTest.java index 467b5d23988f..07830ed53d3c 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/LockRenewalOperationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/LockRenewalOperationTest.java @@ -14,12 +14,12 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; import java.time.Duration; import java.time.OffsetDateTime; import java.util.ArrayDeque; import java.util.Deque; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -87,7 +87,7 @@ void constructor(boolean isSession) { * Verify that when an error occurs, it is displayed. */ @Test - void errors() throws InterruptedException { + void errors() { // Arrange final boolean isSession = true; final Duration renewalPeriod = Duration.ofSeconds(2); @@ -119,7 +119,11 @@ void errors() throws InterruptedException { operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, isSession, renewalOperation, lockedUntil); // Act - TimeUnit.MILLISECONDS.sleep(totalSleepPeriod.toMillis()); + StepVerifier.create(operation.getCompletionOperation()) + .thenAwait(totalSleepPeriod) + .expectErrorMatches(e -> e instanceof IllegalAccessException + && e.getMessage().equals(testError.getMessage())) + .verify(); // Assert assertEquals(LockRenewalStatus.FAILED, operation.getStatus()); @@ -131,7 +135,7 @@ void errors() throws InterruptedException { * Verifies that it stops renewing after the duration has elapsed. */ @Test - void completes() throws InterruptedException { + void completes() { // Arrange final Duration maxDuration = Duration.ofSeconds(8); final Duration renewalPeriod = Duration.ofSeconds(3); @@ -147,10 +151,11 @@ void completes() throws InterruptedException { operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, false, renewalOperation, lockedUntil); // Act - Thread.sleep(totalSleepPeriod.toMillis()); - logger.info("Finished renewals for first sleep."); - Thread.sleep(2000); - System.out.println("Finished second sleep. Should not have any more renewals."); + StepVerifier.create(operation.getCompletionOperation()) + .thenAwait(totalSleepPeriod) + .then(() -> logger.info("Finished renewals for first sleep.")) + .expectComplete() + .verify(Duration.ofMillis(2000)); // Assert assertEquals(LockRenewalStatus.COMPLETE, operation.getStatus()); @@ -165,7 +170,7 @@ void completes() throws InterruptedException { * Verify that we can cancel the operation. */ @Test - void cancellation() throws InterruptedException { + void cancellation() { // Arrange final Duration maxDuration = Duration.ofSeconds(20); final Duration renewalPeriod = Duration.ofSeconds(3); @@ -181,12 +186,14 @@ void cancellation() throws InterruptedException { operation = new LockRenewalOperation(A_LOCK_TOKEN, maxDuration, false, renewalOperation, lockedUntil); // Act - Thread.sleep(totalSleepPeriod.toMillis()); - logger.info("Finished renewals for first sleep. Cancelling"); - operation.close(); - - Thread.sleep(2000); - System.out.println("Finished second sleep. Should not have any more renewals."); + StepVerifier.create(operation.getCompletionOperation()) + .thenAwait(totalSleepPeriod) + .then(() -> { + logger.info("Finished renewals for first sleep. Cancelling"); + operation.close(); + }) + .expectComplete() + .verify(Duration.ofMillis(1000)); // Assert assertEquals(LockRenewalStatus.CANCELLED, operation.getStatus()); From 95c2ebb98dd3111163acdf2d920d6a41ab7e9653 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 17:20:07 -0700 Subject: [PATCH 10/13] Adding sync tests. --- .../ServiceBusReceiverClientTest.java | 155 +++++++++++++++++- 1 file changed, 146 insertions(+), 9 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java index de17dcb097c1..c200b1f0fcf5 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverClientTest.java @@ -15,6 +15,7 @@ import org.mockito.MockitoAnnotations; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.test.publisher.TestPublisher; import java.time.Duration; import java.time.Instant; @@ -25,17 +26,21 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.fail; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -59,9 +64,10 @@ class ServiceBusReceiverClientTest { private Map propertiesToModify; @Mock private ServiceBusTransactionContext transactionContext; - @Mock private ServiceBusReceivedMessage message; + @Mock + private Consumer onErrorConsumer; @BeforeEach void setup() { @@ -126,6 +132,138 @@ void abandonMessageWithProperties() { verify(asyncClient).abandon(eq(message), eq(propertiesToModify)); } + /** + * Verifies that we can auto-renew a message lock. + */ + @Test + void autoRenewMessageLock() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + + doAnswer(answer -> { + fail("On error should not have been invoked."); + return null; + }).when(onErrorConsumer).accept(any()); + when(asyncClient.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + // Act + client.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + + // Assert + verify(asyncClient).getAutoRenewMessageLock(LOCK_TOKEN, maxDuration); + } + + /** + * Verifies that we can auto-renew a message lock and it calls the error consumer. + */ + @Test + void autoRenewMessageLockFails() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + final Throwable testError = new IllegalAccessException("Some exception"); + + when(asyncClient.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + client.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + + // Act + publisher.error(testError); + + // Assert + verify(asyncClient).getAutoRenewMessageLock(LOCK_TOKEN, maxDuration); + verify(onErrorConsumer).accept(testError); + } + + /** + * Verifies that we can auto-renew a message lock and it will not fail with an NPE when we have a null onError. + */ + @Test + void autoRenewMessageLockFailsNull() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + final Throwable testError = new IllegalAccessException("Some exception"); + + when(asyncClient.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + client.getAutoRenewMessageLock(LOCK_TOKEN, maxDuration, null); + + // Act + publisher.error(testError); + + // Assert + verify(asyncClient).getAutoRenewMessageLock(LOCK_TOKEN, maxDuration); + verify(onErrorConsumer, never()).accept(testError); + } + + /** + * Verifies that we can auto-renew a session lock. + */ + @Test + void autoRenewSessionLock() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + + doAnswer(answer -> { + fail("On error should not have been invoked."); + return null; + }).when(onErrorConsumer).accept(any()); + when(asyncClient.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + // Act + client.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + + // Assert + verify(asyncClient).getAutoRenewSessionLock(LOCK_TOKEN, maxDuration); + } + + /** + * Verifies that we can auto-renew a session lock and it calls the error consumer. + */ + @Test + void autoRenewSessionLockFails() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + final Throwable testError = new IllegalAccessException("Some exception"); + + when(asyncClient.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + client.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration, onErrorConsumer); + + // Act + publisher.error(testError); + + // Assert + verify(asyncClient).getAutoRenewSessionLock(LOCK_TOKEN, maxDuration); + verify(onErrorConsumer).accept(testError); + } + + /** + * Verifies that we can auto-renew a message lock and it will not fail with an NPE when we have a null onError. + */ + @Test + void autoRenewSessionLockFailsNull() { + // Arrange + final Duration maxDuration = Duration.ofSeconds(8); + final TestPublisher publisher = TestPublisher.create(); + final Throwable testError = new IllegalAccessException("Some exception"); + + when(asyncClient.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration)).thenReturn(publisher.mono()); + + client.getAutoRenewSessionLock(LOCK_TOKEN, maxDuration, null); + + // Act + publisher.error(testError); + + // Assert + verify(asyncClient).getAutoRenewSessionLock(LOCK_TOKEN, maxDuration); + verify(onErrorConsumer, never()).accept(testError); + } + @Test void completeMessageWithTransaction() { // Arrange @@ -318,6 +456,7 @@ void peekBatchMessagesMax() { } }); }); + when(asyncClient.peekMessages(maxMessages)).thenReturn(messages); // Act @@ -382,15 +521,13 @@ void peekBatchMessagesMaxSequenceNumber() { // Arrange final int maxMessages = 10; final long sequenceNumber = 100; - final Flux messages = Flux.create(sink -> { - sink.onRequest(number -> { - for (int i = 0; i < maxMessages; i++) { - sink.next(mock(ServiceBusReceivedMessage.class)); - } + final Flux messages = Flux.create(sink -> sink.onRequest(number -> { + for (int i = 0; i < maxMessages; i++) { + sink.next(mock(ServiceBusReceivedMessage.class)); + } - sink.complete(); - }); - }); + sink.complete(); + })); when(asyncClient.peekMessagesAt(maxMessages, sequenceNumber)).thenReturn(messages); // Act From 4c7b888fc8bb038b0c8500118f4d1f4d8d8b5f04 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 1 Sep 2020 17:22:09 -0700 Subject: [PATCH 11/13] Adding javadoc. --- .../azure/messaging/servicebus/ServiceBusReceiverClient.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 2c9505c98444..5f6fd4f151ed 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -255,6 +255,7 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions dead * * @param lockToken Lock token of the message. * @param maxLockRenewalDuration Maximum duration to keep renewing the lock token. + * @param onError A function to call when an error occurs during lock renewal. * @throws NullPointerException if {@code lockToken} or {@code maxLockRenewalDuration} is null. * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed. @@ -274,8 +275,9 @@ public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDur * * @param sessionId Id for the session to renew. * @param maxLockRenewalDuration Maximum duration to keep renewing the lock token. + * @param onError A function to call when an error occurs during lock renewal. * @throws NullPointerException if {@code sessionId} or {@code maxLockRenewalDuration} is null. - * @throws IllegalArgumentException if {@code lockToken} is an empty string. + * @throws IllegalArgumentException if {@code sessionId} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ public void getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration, Consumer onError) { From 1c164d8b9a378ac29986c6a3287af2a24fac141d Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 2 Sep 2020 10:49:05 -0700 Subject: [PATCH 12/13] Fix checkstyles. --- .../servicebus/ServiceBusReceiverClient.java | 18 ++++++++++-------- ...eBusReceiverAsyncClientIntegrationTest.java | 1 - 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java index 5f6fd4f151ed..6b68488da526 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/ServiceBusReceiverClient.java @@ -260,14 +260,15 @@ public void deadLetter(ServiceBusReceivedMessage message, DeadLetterOptions dead * @throws IllegalArgumentException if {@code lockToken} is an empty string. * @throws IllegalStateException if the receiver is a session receiver or the receiver is disposed. */ - public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration, Consumer onError) { + public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDuration, + Consumer onError) { final Consumer throwableConsumer = onError != null - ? onError : - error -> logger.warning("Exception occurred while renewing lock token: '{}'.", lockToken, error); + ? onError + : error -> logger.warning("Exception occurred while renewing lock token: '{}'.", lockToken, error); asyncClient.getAutoRenewMessageLock(lockToken, maxLockRenewalDuration).subscribe( v -> logger.verbose("Completed renewing lock token: '{}'", lockToken), - throwableConsumer); + throwableConsumer); } /** @@ -280,14 +281,15 @@ public void getAutoRenewMessageLock(String lockToken, Duration maxLockRenewalDur * @throws IllegalArgumentException if {@code sessionId} is an empty string. * @throws IllegalStateException if the receiver is a non-session receiver or the receiver is disposed. */ - public void getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration, Consumer onError) { + public void getAutoRenewSessionLock(String sessionId, Duration maxLockRenewalDuration, + Consumer onError) { final Consumer throwableConsumer = onError != null - ? onError : - error -> logger.warning("Exception occurred while renewing session: '{}'.", sessionId, error); + ? onError + : error -> logger.warning("Exception occurred while renewing session: '{}'.", sessionId, error); asyncClient.getAutoRenewSessionLock(sessionId, maxLockRenewalDuration).subscribe( v -> logger.verbose("Completed renewing session: '{}'", sessionId), - throwableConsumer); + throwableConsumer); } /** diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java index b433106b7d46..fddf42cc1e94 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/ServiceBusReceiverAsyncClientIntegrationTest.java @@ -7,7 +7,6 @@ import com.azure.messaging.servicebus.administration.models.DeadLetterOptions; import com.azure.messaging.servicebus.implementation.DispositionStatus; import com.azure.messaging.servicebus.implementation.MessagingEntityType; -import com.azure.messaging.servicebus.models.LockRenewalStatus; import com.azure.messaging.servicebus.models.ReceiveMode; import com.azure.messaging.servicebus.models.SubQueue; import org.junit.jupiter.api.Assertions; From be888bd0cf06b82da958464ec8a833e28be78cac Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 2 Sep 2020 10:52:32 -0700 Subject: [PATCH 13/13] Fix spotbugs. --- .../com/azure/messaging/servicebus/LockRenewalOperation.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java index 197cffc1c6bf..3895b3fca3cf 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/main/java/com/azure/messaging/servicebus/LockRenewalOperation.java @@ -68,7 +68,7 @@ class LockRenewalOperation implements AutoCloseable { Objects.requireNonNull(maxLockRenewalDuration, "'maxLockRenewalDuration' cannot be null."); if (maxLockRenewalDuration.isNegative()) { - throw logger.logThrowableAsError(new IllegalArgumentException( + throw logger.logExceptionAsError(new IllegalArgumentException( "'maxLockRenewalDuration' cannot be negative.")); }