diff --git a/sdk/servicebus/microsoft-azure-servicebus/pom.xml b/sdk/servicebus/microsoft-azure-servicebus/pom.xml index a93611f25273..db2bcd698255 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/pom.xml +++ b/sdk/servicebus/microsoft-azure-servicebus/pom.xml @@ -6,7 +6,7 @@ 4.0.0 com.microsoft.azure azure-servicebus - 3.1.0 + 3.1.1 Microsoft Azure SDK for Service Bus Java library for Azure Service Bus diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java index 804ab36fd1e6..d84007a414b6 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ReceiveLinkHandler.java @@ -49,7 +49,7 @@ public void onLinkRemoteOpen(Event event) { this.amqpReceiver.onOpenComplete(null); } } else { - TRACE_LOGGER.debug("onLinkRemoteOpen: linkName:{}, remoteTarget:{}, remoteTarget:{}, action:{}", receiver.getName(), null, null, "waitingForError"); + TRACE_LOGGER.debug("onLinkRemoteOpen: linkName:{}, remoteSource:{}, remoteTarget:{}, action:{}", receiver.getName(), null, null, "waitingForError"); } } } diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java index d0886843f906..0c29eba7c770 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientEntity.java @@ -46,9 +46,18 @@ protected boolean getIsClosingOrClosed() { // used to force close when entity is faulted protected final void setClosed() { synchronized (this.syncClose) { + this.isClosing = false; this.isClosed = true; } } + + protected final void setClosing() { + synchronized (this.syncClose) { + if (!this.isClosed) { + this.isClosing = true; + } + } + } public final CompletableFuture closeAsync() { if (this.getIsClosingOrClosed()) { diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index 544ab248d185..62acf8dd83a7 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -104,9 +104,9 @@ public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, private CompletableFuture receiveLinkReopenFuture; private final Runnable timedOutUpdateStateRequestsDaemon; private final Runnable returnMesagesLoopDaemon; - private final ScheduledFuture updateStateRequestsTimeoutChecker; - private final ScheduledFuture returnMessagesLoopRunner; private final MessagingEntityType entityType; + private ScheduledFuture updateStateRequestsTimeoutChecker; + private ScheduledFuture returnMessagesLoopRunner; // TODO: Change onReceiveComplete to handle empty deliveries. Change onError to retry updateState requests. private CoreMessageReceiver(final MessagingFactory factory, @@ -145,6 +145,11 @@ private CoreMessageReceiver(final MessagingFactory factory, this.timedOutUpdateStateRequestsDaemon = () -> { try { + if (CoreMessageReceiver.this.getIsClosed()) { + CoreMessageReceiver.this.updateStateRequestsTimeoutChecker.cancel(true); + return; + } + TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", CoreMessageReceiver.this.receivePath); for (Map.Entry entry : CoreMessageReceiver.this.pendingUpdateStateRequests.entrySet()) { Duration remainingTime = entry.getValue().getTimeoutTracker().remaining(); @@ -167,6 +172,11 @@ private CoreMessageReceiver(final MessagingFactory factory, // CONTRACT: message should be delivered to the caller of MessageReceiver.receive() only from prefetched messages this.returnMesagesLoopDaemon = () -> { try { + if (CoreMessageReceiver.this.getIsClosed()) { + CoreMessageReceiver.this.returnMessagesLoopRunner.cancel(true); + return; + } + TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", CoreMessageReceiver.this.receivePath); while (!CoreMessageReceiver.this.prefetchedMessages.isEmpty()) { ReceiveWorkItem currentReceive = CoreMessageReceiver.this.pendingReceives.poll(); @@ -187,11 +197,6 @@ private CoreMessageReceiver(final MessagingFactory factory, // Shouldn't throw any exception for the executor to run multiple times.. Should never come here } }; - - // As all update state requests have the same timeout, one timer is better than having one timer per request - this.updateStateRequestsTimeoutChecker = Timer.schedule(timedOutUpdateStateRequestsDaemon, CoreMessageReceiver.UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun); - // Scheduling it as a separate thread that wakes up at regular very short intervals.. Doesn't wait on incoming receive requests from callers or incoming deliveries from reactor - this.returnMessagesLoopRunner = Timer.schedule(returnMesagesLoopDaemon, CoreMessageReceiver.RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun); } // Connection has to be associated with Reactor before Creating a receiver on it. @@ -522,6 +527,11 @@ public void onOpenComplete(Exception exception) { if (exception == null) { if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) { AsyncUtil.completeFuture(this.linkOpen.getWork(), this); + + // As all update state requests have the same timeout, one timer is better than having one timer per request + this.updateStateRequestsTimeoutChecker = Timer.schedule(timedOutUpdateStateRequestsDaemon, CoreMessageReceiver.UPDATE_STATE_REQUESTS_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun); + // Scheduling it as a separate thread that wakes up at regular very short intervals.. Doesn't wait on incoming receive requests from callers or incoming deliveries from reactor + this.returnMessagesLoopRunner = Timer.schedule(returnMesagesLoopDaemon, CoreMessageReceiver.RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL, TimerType.RepeatRun); } if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) { @@ -746,14 +756,15 @@ private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) { Timer.schedule( () -> { if (!linkOpen.getWork().isDone()) { - CoreMessageReceiver.this.closeInternals(false); - CoreMessageReceiver.this.setClosed(); - Exception operationTimedout = new TimeoutException( String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()), CoreMessageReceiver.this.lastKnownLinkError); TRACE_LOGGER.warn(operationTimedout.getMessage()); ExceptionUtil.completeExceptionally(linkOpen.getWork(), operationTimedout, CoreMessageReceiver.this, true); + + CoreMessageReceiver.this.setClosing(); + CoreMessageReceiver.this.closeInternals(false); + CoreMessageReceiver.this.setClosed(); } }, timeout.remaining(), diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java index 34099ab80b49..ba38b535ba76 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageSender.java @@ -618,14 +618,15 @@ private void initializeLinkOpen(TimeoutTracker timeout) { Timer.schedule( () -> { if (!CoreMessageSender.this.linkFirstOpen.isDone()) { - CoreMessageSender.this.closeInternals(false); - CoreMessageSender.this.setClosed(); - Exception operationTimedout = new TimeoutException( String.format(Locale.US, "Open operation on SendLink(%s) on Entity(%s) timed out at %s.", CoreMessageSender.this.sendLink.getName(), CoreMessageSender.this.getSendPath(), ZonedDateTime.now().toString()), CoreMessageSender.this.lastKnownErrorReportedAt.isAfter(Instant.now().minusSeconds(ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS)) ? CoreMessageSender.this.lastKnownLinkError : null); TRACE_LOGGER.warn(operationTimedout.getMessage()); ExceptionUtil.completeExceptionally(CoreMessageSender.this.linkFirstOpen, operationTimedout, CoreMessageSender.this, true); + + CoreMessageSender.this.setClosing(); + CoreMessageSender.this.closeInternals(false); + CoreMessageSender.this.setClosed(); } }, timeout.remaining(), diff --git a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java index 3e3866fe23e9..2ca359ba36db 100644 --- a/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java +++ b/sdk/servicebus/microsoft-azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Util.java @@ -345,7 +345,7 @@ static int encodeMessageToCustomArray(Message message, byte[] encodedBytes, int // Pass little less than client timeout to the server so client doesn't time out before server times out public static Duration adjustServerTimeout(Duration clientTimeout) { - return clientTimeout.minusMillis(100); + return clientTimeout.minusMillis(200); } // This is not super stable for some reason