diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index 33d924abf5a9..9a8b73b9b6e5 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -370,11 +370,18 @@ - - + + + + + + + + + @@ -389,4 +396,10 @@ + + + + + + diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/QueueDescription.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/QueueDescription.java index 66173b324fcc..49158c1cce95 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/QueueDescription.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/QueueDescription.java @@ -72,6 +72,9 @@ public Duration getLockDuration() { * @param lockDuration - The duration of a peek lock. Max value is 5 minutes. */ public void setLockDuration(Duration lockDuration) { + if (lockDuration == null) { + throw new IllegalArgumentException("Value cannot be null"); + } this.lockDuration = lockDuration; if (this.lockDuration.compareTo(ManagementClientConstants.MAX_DURATION) > 0) { this.lockDuration = ManagementClientConstants.MAX_DURATION; diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescription.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescription.java index a26e5615c33a..e624fcfe5450 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescription.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescription.java @@ -96,6 +96,9 @@ public Duration getLockDuration() { * @param lockDuration - The duration of a peek lock. Max value is 5 minutes. */ public void setLockDuration(Duration lockDuration) { + if (lockDuration == null) { + throw new IllegalArgumentException("Value cannot be null"); + } this.lockDuration = lockDuration; if (this.lockDuration.compareTo(ManagementClientConstants.MAX_DURATION) > 0) { this.lockDuration = ManagementClientConstants.MAX_DURATION; @@ -134,9 +137,9 @@ public Duration getDefaultMessageTimeToLive() { * See {@link #getDefaultMessageTimeToLive()} */ public void setDefaultMessageTimeToLive(Duration defaultMessageTimeToLive) { - if (defaultMessageTimeToLive != null - && (defaultMessageTimeToLive.compareTo(ManagementClientConstants.MIN_ALLOWED_TTL) < 0 - || defaultMessageTimeToLive.compareTo(ManagementClientConstants.MAX_ALLOWED_TTL) > 0)) { + if (defaultMessageTimeToLive == null + || (defaultMessageTimeToLive.compareTo(ManagementClientConstants.MIN_ALLOWED_TTL) < 0 + || defaultMessageTimeToLive.compareTo(ManagementClientConstants.MAX_ALLOWED_TTL) > 0)) { throw new IllegalArgumentException( String.format("The value must be between %s and %s.", ManagementClientConstants.MAX_ALLOWED_TTL, @@ -159,8 +162,8 @@ public Duration getAutoDeleteOnIdle() { * The minimum duration is 5 minutes. */ public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) { - if (autoDeleteOnIdle != null - && autoDeleteOnIdle.compareTo(ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION) < 0) { + if (autoDeleteOnIdle == null + || autoDeleteOnIdle.compareTo(ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION) < 0) { throw new IllegalArgumentException( String.format("The value must be greater than %s.", ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION)); diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescriptionSerializer.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescriptionSerializer.java index 0c4a54139937..2ab8af6dd5af 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescriptionSerializer.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionDescriptionSerializer.java @@ -190,20 +190,17 @@ private static SubscriptionDescription parseFromEntry(String topicName, Node xEn Node node = nList.item(i); if (node.getNodeType() == Node.ELEMENT_NODE) { Element element = (Element)node; - switch(element.getTagName()) - { + switch (element.getTagName()) { case "title": sd = new SubscriptionDescription(topicName, element.getFirstChild().getNodeValue()); break; case "content": NodeList qdNodes = element.getFirstChild().getChildNodes(); - for (int j = 0; j < qdNodes.getLength(); j++) - { + for (int j = 0; j < qdNodes.getLength(); j++) { node = qdNodes.item(j); if (node.getNodeType() == Node.ELEMENT_NODE) { element = (Element) node; - switch (element.getTagName()) - { + switch (element.getTagName()) { case "RequiresSession": sd.requiresSession = Boolean.parseBoolean(element.getFirstChild().getNodeValue()); break; @@ -246,6 +243,8 @@ private static SubscriptionDescription parseFromEntry(String topicName, Node xEn sd.forwardDeadLetteredMessagesTo = fwdDlq.getNodeValue(); } break; + default: + break; } } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionRuntimeInfoSerializer.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionRuntimeInfoSerializer.java index db19f88cd939..4970bcee3835 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionRuntimeInfoSerializer.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/SubscriptionRuntimeInfoSerializer.java @@ -52,20 +52,17 @@ private static SubscriptionRuntimeInfo parseFromEntry(String topicPath, Node xEn Node node = nList.item(i); if (node.getNodeType() == Node.ELEMENT_NODE) { Element element = (Element)node; - switch(element.getTagName()) - { + switch (element.getTagName()) { case "title": runtimeInfo = new SubscriptionRuntimeInfo(topicPath, element.getFirstChild().getNodeValue()); break; case "content": NodeList qdNodes = element.getFirstChild().getChildNodes(); - for (int j = 0; j < qdNodes.getLength(); j++) - { + for (int j = 0; j < qdNodes.getLength(); j++) { node = qdNodes.item(j); if (node.getNodeType() == Node.ELEMENT_NODE) { element = (Element) node; - switch (element.getTagName()) - { + switch (element.getTagName()) { case "AccessedAt": runtimeInfo.setAccessedAt(Instant.parse(element.getFirstChild().getNodeValue())); break; @@ -102,6 +99,8 @@ private static SubscriptionRuntimeInfo parseFromEntry(String topicPath, Node xEn case "TransferDeadLetterMessageCount": runtimeInfo.getMessageCountDetails().setTransferDeadLetterMessageCount(Long.parseLong(element.getFirstChild().getNodeValue())); break; + default: + break; } } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicDescription.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicDescription.java index b9f0bd3b5842..b5ffd299716f 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicDescription.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicDescription.java @@ -31,16 +31,14 @@ public class TopicDescription { * Max length is 260 chars. Cannot start or end with a slash. * Cannot have restricted characters: '@','?','#','*' */ - public TopicDescription(String path) - { + public TopicDescription(String path) { this.setPath(path); } /** * @return the path of the topic. */ - public String getPath() - { + public String getPath() { return this.path; } @@ -49,8 +47,7 @@ public String getPath() * Max length is 260 chars. Cannot start or end with a slash. * Cannot have restricted characters: '@','?','#','*' */ - private void setPath(String path) - { + private void setPath(String path) { EntityNameHelper.checkValidTopicName(path); this.path = path; } @@ -66,8 +63,7 @@ public long getMaxSizeInMB() { /** * @param maxSize - Sets the maximum size of the topic in megabytes, which is the size of memory allocated for the topic. */ - public void setMaxSizeInMB(long maxSize) - { + public void setMaxSizeInMB(long maxSize) { this.maxSizeInMB = maxSize; } @@ -106,10 +102,9 @@ public Duration getDefaultMessageTimeToLive() { * See {@link #getDefaultMessageTimeToLive()} */ public void setDefaultMessageTimeToLive(Duration defaultMessageTimeToLive) { - if (defaultMessageTimeToLive != null && - (defaultMessageTimeToLive.compareTo(ManagementClientConstants.MIN_ALLOWED_TTL) < 0 || - defaultMessageTimeToLive.compareTo(ManagementClientConstants.MAX_ALLOWED_TTL) > 0)) - { + if (defaultMessageTimeToLive == null + || (defaultMessageTimeToLive.compareTo(ManagementClientConstants.MIN_ALLOWED_TTL) < 0 + || defaultMessageTimeToLive.compareTo(ManagementClientConstants.MAX_ALLOWED_TTL) > 0)) { throw new IllegalArgumentException( String.format("The value must be between %s and %s.", ManagementClientConstants.MAX_ALLOWED_TTL, @@ -132,9 +127,8 @@ public Duration getAutoDeleteOnIdle() { * The minimum duration is 5 minutes. */ public void setAutoDeleteOnIdle(Duration autoDeleteOnIdle) { - if (autoDeleteOnIdle != null && - autoDeleteOnIdle.compareTo(ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION) < 0) - { + if (autoDeleteOnIdle == null + || autoDeleteOnIdle.compareTo(ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION) < 0) { throw new IllegalArgumentException( String.format("The value must be greater than %s.", ManagementClientConstants.MIN_ALLOWED_AUTODELETE_DURATION)); @@ -159,10 +153,9 @@ public Duration getDuplicationDetectionHistoryTimeWindow() { * Max value is 1 day and minimum is 20 seconds. */ public void setDuplicationDetectionHistoryTimeWindow(Duration duplicationDetectionHistoryTimeWindow) { - if (duplicationDetectionHistoryTimeWindow != null && - (duplicationDetectionHistoryTimeWindow.compareTo(ManagementClientConstants.MIN_DUPLICATE_HISTORY_DURATION) < 0 || - duplicationDetectionHistoryTimeWindow.compareTo(ManagementClientConstants.MAX_DUPLICATE_HISTORY_DURATION) > 0)) - { + if (duplicationDetectionHistoryTimeWindow == null + || (duplicationDetectionHistoryTimeWindow.compareTo(ManagementClientConstants.MIN_DUPLICATE_HISTORY_DURATION) < 0 + || duplicationDetectionHistoryTimeWindow.compareTo(ManagementClientConstants.MAX_DUPLICATE_HISTORY_DURATION) > 0)) { throw new IllegalArgumentException( String.format("The value must be between %s and %s.", ManagementClientConstants.MIN_DUPLICATE_HISTORY_DURATION, diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicDescriptionSerializer.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicDescriptionSerializer.java index f444fe05f4fe..f3a9c4edb895 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicDescriptionSerializer.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicDescriptionSerializer.java @@ -176,20 +176,17 @@ private static TopicDescription parseFromEntry(Node xEntry) { Node node = nList.item(i); if (node.getNodeType() == Node.ELEMENT_NODE) { Element element = (Element)node; - switch(element.getTagName()) - { + switch (element.getTagName()) { case "title": td = new TopicDescription(element.getFirstChild().getNodeValue()); break; case "content": NodeList qdNodes = element.getFirstChild().getChildNodes(); - for (int j = 0; j < qdNodes.getLength(); j++) - { + for (int j = 0; j < qdNodes.getLength(); j++) { node = qdNodes.item(j); if (node.getNodeType() == Node.ELEMENT_NODE) { element = (Element) node; - switch (element.getTagName()) - { + switch (element.getTagName()) { case "MaxSizeInMegabytes": td.maxSizeInMB = Long.parseLong(element.getFirstChild().getNodeValue()); break; @@ -223,6 +220,8 @@ private static TopicDescription parseFromEntry(Node xEntry) { case "SupportOrdering": td.supportOrdering = Boolean.parseBoolean(element.getFirstChild().getNodeValue()); break; + default: + break; } } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicRuntimeInfoSerializer.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicRuntimeInfoSerializer.java index 843e36de7aa0..84882bf71912 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicRuntimeInfoSerializer.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/management/TopicRuntimeInfoSerializer.java @@ -52,20 +52,17 @@ private static TopicRuntimeInfo parseFromEntry(Node xEntry) { Node node = nList.item(i); if (node.getNodeType() == Node.ELEMENT_NODE) { Element element = (Element)node; - switch(element.getTagName()) - { + switch (element.getTagName()) { case "title": topicRuntimeInfo = new TopicRuntimeInfo(element.getFirstChild().getNodeValue()); break; case "content": NodeList qdNodes = element.getFirstChild().getChildNodes(); - for (int j = 0; j < qdNodes.getLength(); j++) - { + for (int j = 0; j < qdNodes.getLength(); j++) { node = qdNodes.item(j); if (node.getNodeType() == Node.ELEMENT_NODE) { element = (Element) node; - switch (element.getTagName()) - { + switch (element.getTagName()) { case "AccessedAt": topicRuntimeInfo.setAccessedAt(Instant.parse(element.getFirstChild().getNodeValue())); break; @@ -105,6 +102,8 @@ private static TopicRuntimeInfo parseFromEntry(Node xEntry) { case "TransferDeadLetterMessageCount": topicRuntimeInfo.getMessageCountDetails().setTransferDeadLetterMessageCount(Long.parseLong(element.getFirstChild().getNodeValue())); break; + default: + break; } } } diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java index f9fa63e275ce..5be36e923e15 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java @@ -3,6 +3,10 @@ package com.microsoft.azure.servicebus.primitives; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; import java.time.Duration; import java.util.Properties; import java.util.UUID; @@ -190,12 +194,22 @@ private ClientConstants() { } private static String getClientVersion() { String clientVersion; final Properties properties = new Properties(); + InputStream clientPropInputStream = null; try { - properties.load(ClientConstants.class.getResourceAsStream("/client.properties")); + clientPropInputStream = ClientConstants.class.getResourceAsStream("/client.properties"); + properties.load(clientPropInputStream); clientVersion = properties.getProperty("client.version"); } catch (Exception e) { clientVersion = "NOTFOUND"; TRACE_LOGGER.error("Exception while retrieving client version. Exception: ", e.toString()); + } finally { + if (clientPropInputStream != null) { + try { + clientPropInputStream.close(); + } catch (IOException e) { + TRACE_LOGGER.error("Client Properties InputStream doesn't close properly. Exception: ", e.toString()); + } + } } return clientVersion; diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java index a61a68f0f264..f3fd78f882c1 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/Controller.java @@ -25,12 +25,13 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; class Controller { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(Controller.class); private MessagingFactory messagingFactory; private CoreMessageSender internalSender; - private boolean isInitialized = false; + private AtomicBoolean isInitialized = new AtomicBoolean(false); private URI namespaceEndpointURI; private ClientSettings clientSettings; @@ -41,7 +42,7 @@ class Controller { } synchronized CompletableFuture initializeAsync() { - if (this.isInitialized) { + if (this.isInitialized.get()) { return CompletableFuture.completedFuture(null); } else { TRACE_LOGGER.info("Creating MessageSender to coordinator"); @@ -54,7 +55,7 @@ synchronized CompletableFuture initializeAsync() { senderFuture.handleAsync((s, coreSenderCreationEx) -> { if (coreSenderCreationEx == null) { this.internalSender = s; - this.isInitialized = true; + this.isInitialized.set(true); TRACE_LOGGER.info("Created MessageSender to coordinator"); postSenderCreationFuture.complete(null); } else { @@ -72,7 +73,7 @@ synchronized CompletableFuture initializeAsync() { public CompletableFuture declareAsync() { Message message = Message.Factory.create(); Declare declare = new Declare(); - message.setBody(new AmqpValue(declare)); + message.setBody(new AmqpValue(declare)); return this.internalSender.sendAndReturnDeliveryStateAsync( message, diff --git a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java index fb846aafbc0b..1a5dc13cdc44 100644 --- a/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java +++ b/servicebus/data-plane/azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/CoreMessageReceiver.java @@ -63,8 +63,7 @@ */ // TODO Take a re-look at the choice of collections used. Some of them are overkill may be. -public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider -{ +public class CoreMessageReceiver extends ClientEntity implements IAmqpReceiver, IErrorContextProvider { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CoreMessageReceiver.class); private static final Duration LINK_REOPEN_TIMEOUT = Duration.ofMinutes(5); // service closes link long before this timeout expires private static final Duration RETURN_MESSAGES_DAEMON_WAKE_UP_INTERVAL = Duration.ofMillis(1); // Wakes up every 1 millisecond @@ -116,8 +115,7 @@ private CoreMessageReceiver(final MessagingFactory factory, final String sessionId, final int prefetchCount, final SettleModePair settleModePair, - final MessagingEntityType entityType) - { + final MessagingEntityType entityType) { super(name); this.underlyingFactory = factory; @@ -129,12 +127,12 @@ private CoreMessageReceiver(final MessagingFactory factory, this.isBrowsableSession = false; this.prefetchCount = prefetchCount; this.settleModePair = settleModePair; - this.prefetchedMessages = new ConcurrentLinkedQueue(); - this.linkClose = new CompletableFuture(); + this.prefetchedMessages = new ConcurrentLinkedQueue<>(); + this.linkClose = new CompletableFuture<>(); this.lastKnownLinkError = null; this.prefetchCountSync = new Object(); this.retryPolicy = factory.getRetryPolicy(); - this.pendingReceives = new ConcurrentLinkedQueue(); + this.pendingReceives = new ConcurrentLinkedQueue<>(); this.pendingUpdateStateRequests = new ConcurrentHashMap<>(); this.tagsToDeliveriesMap = new ConcurrentHashMap<>(); @@ -145,68 +143,48 @@ private CoreMessageReceiver(final MessagingFactory factory, this.currentPrefetechedMessagesCount = new AtomicInteger(); this.entityType = entityType; - this.timedOutUpdateStateRequestsDaemon = new Runnable() { - @Override - public void run() { - try - { - TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", CoreMessageReceiver.this.receivePath); - for(Map.Entry entry : CoreMessageReceiver.this.pendingUpdateStateRequests.entrySet()) - { - Duration remainingTime = entry.getValue().getTimeoutTracker().remaining(); - if(remainingTime.isZero() || remainingTime.isNegative()) - { - CoreMessageReceiver.this.pendingUpdateStateRequests.remove(entry.getKey()); - Exception exception = entry.getValue().getLastKnownException(); - if(exception == null) - { - exception = new TimeoutException("Request timed out."); - } - TRACE_LOGGER.error("UpdateState request timed out. Delivery:{}", entry.getKey(), exception); - AsyncUtil.completeFutureExceptionally(entry.getValue().getWork(), exception); + this.timedOutUpdateStateRequestsDaemon = () -> { + try { + TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to complete timed out update state requests.", CoreMessageReceiver.this.receivePath); + for (Map.Entry entry : CoreMessageReceiver.this.pendingUpdateStateRequests.entrySet()) { + Duration remainingTime = entry.getValue().getTimeoutTracker().remaining(); + if (remainingTime.isZero() || remainingTime.isNegative()) { + CoreMessageReceiver.this.pendingUpdateStateRequests.remove(entry.getKey()); + Exception exception = entry.getValue().getLastKnownException(); + if (exception == null) { + exception = new TimeoutException("Request timed out."); } + TRACE_LOGGER.error("UpdateState request timed out. Delivery:{}", entry.getKey(), exception); + AsyncUtil.completeFutureExceptionally(entry.getValue().getWork(), exception); } - TRACE_LOGGER.trace("'{}' core message receiver's internal loop to complete timed out update state requests stopped.", CoreMessageReceiver.this.receivePath); - } - catch(Throwable e) - { - // Shouldn't throw any exception for the executor to run multiple times.. Should never come here } + TRACE_LOGGER.trace("'{}' core message receiver's internal loop to complete timed out update state requests stopped.", CoreMessageReceiver.this.receivePath); + } catch(Throwable e) { + // Shouldn't throw any exception for the executor to run multiple times.. Should never come here } }; // CONTRACT: message should be delivered to the caller of MessageReceiver.receive() only from prefetched messages - this.returnMesagesLoopDaemon = new Runnable() { - @Override - public void run() { - try - { - TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", CoreMessageReceiver.this.receivePath); - while(!CoreMessageReceiver.this.prefetchedMessages.isEmpty()) - { - ReceiveWorkItem currentReceive = CoreMessageReceiver.this.pendingReceives.poll(); - if (currentReceive != null) - { - if(!currentReceive.getWork().isDone()) - { - TRACE_LOGGER.debug("Returning the message received from '{}' to a pending receive request", CoreMessageReceiver.this.receivePath); - currentReceive.cancelTimeoutTask(false); - List messages = CoreMessageReceiver.this.receiveCore(currentReceive.getMaxMessageCount()); - CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(currentReceive.getMaxMessageCount()); - AsyncUtil.completeFuture(currentReceive.getWork(), messages); - } - } - else - { - break; + this.returnMesagesLoopDaemon = () -> { + try { + TRACE_LOGGER.trace("Starting '{}' core message receiver's internal loop to return messages to waiting clients.", CoreMessageReceiver.this.receivePath); + while (!CoreMessageReceiver.this.prefetchedMessages.isEmpty()) { + ReceiveWorkItem currentReceive = CoreMessageReceiver.this.pendingReceives.poll(); + if (currentReceive != null) { + if (!currentReceive.getWork().isDone()) { + TRACE_LOGGER.debug("Returning the message received from '{}' to a pending receive request", CoreMessageReceiver.this.receivePath); + currentReceive.cancelTimeoutTask(false); + List messages = CoreMessageReceiver.this.receiveCore(currentReceive.getMaxMessageCount()); + CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(currentReceive.getMaxMessageCount()); + AsyncUtil.completeFuture(currentReceive.getWork(), messages); } + } else { + break; } - TRACE_LOGGER.trace("'{}' core message receiver's internal loop to return messages to waiting clients stopped.", CoreMessageReceiver.this.receivePath); - } - catch(Throwable e) - { - // Shouldn't throw any exception for the executor to run multiple times.. Should never come here } + TRACE_LOGGER.trace("'{}' core message receiver's internal loop to return messages to waiting clients stopped.", CoreMessageReceiver.this.receivePath); + } catch(Throwable e) { + // Shouldn't throw any exception for the executor to run multiple times.. Should never come here } }; @@ -223,8 +201,7 @@ public static CompletableFuture create( final String name, final String recvPath, final int prefetchCount, - final SettleModePair settleModePair) - { + final SettleModePair settleModePair) { return create(factory, name, recvPath, prefetchCount, settleModePair, null); } @@ -236,8 +213,7 @@ public static CompletableFuture create( final String sessionId, final boolean isBrowsableSession, final int prefetchCount, - final SettleModePair settleModePair) - { + final SettleModePair settleModePair) { return create(factory, name, recvPath, sessionId, isBrowsableSession, prefetchCount, settleModePair, null); } @@ -247,8 +223,7 @@ public static CompletableFuture create( final String recvPath, final int prefetchCount, final SettleModePair settleModePair, - final MessagingEntityType entityType) - { + final MessagingEntityType entityType) { TRACE_LOGGER.info("Creating core message receiver to '{}'", recvPath); CoreMessageReceiver msgReceiver = new CoreMessageReceiver( factory, @@ -269,8 +244,7 @@ public static CompletableFuture create( final boolean isBrowsableSession, final int prefetchCount, final SettleModePair settleModePair, - final MessagingEntityType entityType) - { + final MessagingEntityType entityType) { TRACE_LOGGER.info("Creating core session receiver to '{}', sessionId '{}', browseonly session '{}'", recvPath, sessionId, isBrowsableSession); CoreMessageReceiver msgReceiver = new CoreMessageReceiver( factory, @@ -285,32 +259,23 @@ public static CompletableFuture create( return msgReceiver.createLink(); } - private CompletableFuture createLink() - { - this.linkOpen = new WorkItem(new CompletableFuture(), this.operationTimeout); + private CompletableFuture createLink() { + this.linkOpen = new WorkItem<>(new CompletableFuture<>(), this.operationTimeout); this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker()); this.sendTokenAndSetRenewTimer(false).handleAsync((v, sasTokenEx) -> { - if(sasTokenEx != null) - { + if (sasTokenEx != null) { Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sasTokenEx); TRACE_LOGGER.error("Sending SAS Token failed. ReceivePath:{}", this.receivePath, cause); this.linkOpen.getWork().completeExceptionally(cause); - } - else - { - try - { - this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() - { + } else { + try { + this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { @Override - public void onEvent() - { + public void onEvent() { CoreMessageReceiver.this.createReceiveLink(); } }); - } - catch (IOException ioException) - { + } catch (IOException ioException) { this.cancelSASTokenRenewTimer(); this.linkOpen.getWork().completeExceptionally(new ServiceBusException(false, "Failed to create Receiver, see cause for more details.", ioException)); } @@ -322,26 +287,19 @@ public void onEvent() return this.linkOpen.getWork(); } - private CompletableFuture createRequestResponseLinkAsync() - { + private CompletableFuture createRequestResponseLinkAsync() { synchronized (this.requestResonseLinkCreationLock) { - if(this.requestResponseLinkCreationFuture == null) - { - this.requestResponseLinkCreationFuture = new CompletableFuture(); - this.underlyingFactory.obtainRequestResponseLinkAsync(this.receivePath, this.entityType).handleAsync((rrlink, ex) -> - { - if(ex == null) - { + if (this.requestResponseLinkCreationFuture == null) { + this.requestResponseLinkCreationFuture = new CompletableFuture<>(); + this.underlyingFactory.obtainRequestResponseLinkAsync(this.receivePath, this.entityType).handleAsync((rrlink, ex) -> { + if (ex == null) { this.requestResponseLink = rrlink; this.requestResponseLinkCreationFuture.complete(null); - } - else - { + } else { Throwable cause = ExceptionUtil.extractAsyncCompletionCause(ex); this.requestResponseLinkCreationFuture.completeExceptionally(cause); // Set it to null so next call will retry rr link creation - synchronized (this.requestResonseLinkCreationLock) - { + synchronized (this.requestResonseLinkCreationLock) { this.requestResponseLinkCreationFuture = null; } } @@ -353,12 +311,9 @@ private CompletableFuture createRequestResponseLinkAsync() } } - private void closeRequestResponseLink() - { - synchronized (this.requestResonseLinkCreationLock) - { - if(this.requestResponseLinkCreationFuture != null) - { + private void closeRequestResponseLink() { + synchronized (this.requestResonseLinkCreationLock) { + if (this.requestResponseLinkCreationFuture != null) { this.requestResponseLinkCreationFuture.thenRun(() -> { this.underlyingFactory.releaseRequestResponseLink(this.receivePath); this.requestResponseLink = null; @@ -368,8 +323,7 @@ private void closeRequestResponseLink() } } - private void createReceiveLink() - { + private void createReceiveLink() { TRACE_LOGGER.info("Creating receive link to '{}'", this.receivePath); Connection connection = this.underlyingFactory.getConnection(); @@ -389,13 +343,11 @@ private void createReceiveLink() Map linkProperties = new HashMap<>(); // ServiceBus expects timeout to be of type unsignedint linkProperties.put(ClientConstants.LINK_TIMEOUT_PROPERTY, UnsignedInteger.valueOf(Util.adjustServerTimeout(this.underlyingFactory.getOperationTimeout()).toMillis())); - if(this.entityType != null) - { + if (this.entityType != null) { linkProperties.put(ClientConstants.ENTITY_TYPE_PROPERTY, this.entityType.getIntValue()); } - if(this.isSessionReceiver) - { + if (this.isSessionReceiver) { HashMap filterMap = new HashMap(); filterMap.put(ClientConstants.SESSION_FILTER, this.sessionId); source.setFilter(filterMap); @@ -420,54 +372,42 @@ private void createReceiveLink() this.underlyingFactory.registerForConnectionError(this.receiveLink); } - CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) - { - if(this.getIsClosingOrClosed()) - { + CompletableFuture sendTokenAndSetRenewTimer(boolean retryOnFailure) { + if (this.getIsClosingOrClosed()) { return CompletableFuture.completedFuture(null); - } - else - { + } else { CompletableFuture> sendTokenFuture = this.underlyingFactory.sendSecurityTokenAndSetRenewTimer(this.sasTokenAudienceURI, retryOnFailure, () -> this.sendTokenAndSetRenewTimer(true)); - return sendTokenFuture.thenAccept((f) -> {this.sasTokenRenewTimerFuture = f;}); + return sendTokenFuture.thenAccept((f) -> this.sasTokenRenewTimerFuture = f); } } - private void throwIfInUnusableState() - { - if(this.isSessionReceiver && this.isSessionLockLost) - { + private void throwIfInUnusableState() { + if(this.isSessionReceiver && this.isSessionLockLost) { throw new IllegalStateException("Session lock lost and cannot be used. Close this session and accept another session."); } this.throwIfClosed(this.lastKnownLinkError); } - private void cancelSASTokenRenewTimer() - { - if(this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) - { + private void cancelSASTokenRenewTimer() { + if (this.sasTokenRenewTimerFuture != null && !this.sasTokenRenewTimerFuture.isDone()) { this.sasTokenRenewTimerFuture.cancel(true); TRACE_LOGGER.debug("Cancelled SAS Token renew timer"); } } - private List receiveCore(int messageCount) - { + private List receiveCore(int messageCount) { List returnMessages = null; MessageWithDeliveryTag currentMessage = this.prefetchedMessages.poll(); int returnedMessageCount = 0; - while (currentMessage != null) - { + while (currentMessage != null) { this.currentPrefetechedMessagesCount.decrementAndGet(); - if (returnMessages == null) - { - returnMessages = new LinkedList(); + if (returnMessages == null) { + returnMessages = new LinkedList<>(); } returnMessages.add(currentMessage); - if (++returnedMessageCount >= messageCount) - { + if (++returnedMessageCount >= messageCount) { break; } @@ -477,150 +417,114 @@ private List receiveCore(int messageCount) return returnMessages; } - public int getPrefetchCount() - { - synchronized (this.prefetchCountSync) - { + public int getPrefetchCount() { + synchronized (this.prefetchCountSync) { return this.prefetchCount; } } - public String getSessionId() - { + public String getSessionId() { return this.sessionId; } - public Instant getSessionLockedUntilUtc() - { - if(this.isSessionReceiver) - { + public Instant getSessionLockedUntilUtc() { + if (this.isSessionReceiver) { return this.sessionLockedUntilUtc; - } - else - { + } else { throw new RuntimeException("Object is not a session receiver"); } } - public void setPrefetchCount(final int value) throws ServiceBusException - { - if(value < 0) - { + public void setPrefetchCount(final int value) throws ServiceBusException { + if (value < 0) { throw new IllegalArgumentException("Prefetch count cannot be negative."); } this.throwIfInUnusableState(); final int deltaPrefetchCount; - synchronized (this.prefetchCountSync) - { + synchronized (this.prefetchCountSync) { deltaPrefetchCount = value - this.prefetchCount; this.prefetchCount = value; TRACE_LOGGER.info("Setting prefetch count to '{}' on recieve link to '{}'", value, this.receivePath); } - if(deltaPrefetchCount > 0) - { - try - { - this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() - { + if (deltaPrefetchCount > 0) { + try { + this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { @Override - public void onEvent() - { + public void onEvent() { sendFlow(deltaPrefetchCount); } }); - } - catch (IOException ioException) - { + } catch (IOException ioException) { throw new ServiceBusException(false, "Setting prefetch count failed, see cause for more details", ioException); } } } - public CompletableFuture> receiveAsync(final int maxMessageCount, Duration timeout) - { + public CompletableFuture> receiveAsync(final int maxMessageCount, Duration timeout) { this.throwIfInUnusableState(); - if (maxMessageCount <= 0) - { + if (maxMessageCount <= 0) { throw new IllegalArgumentException("parameter 'maxMessageCount' should be a positive number"); } TRACE_LOGGER.debug("Receiving maximum of '{}' messages from '{}'", maxMessageCount, this.receivePath); - CompletableFuture> onReceive = new CompletableFuture>(); + CompletableFuture> onReceive = new CompletableFuture<>(); final ReceiveWorkItem receiveWorkItem = new ReceiveWorkItem(onReceive, timeout, maxMessageCount); this.creditNeededtoServePendingReceives.addAndGet(maxMessageCount); this.pendingReceives.add(receiveWorkItem); // ZERO timeout is special case in SBMP clients where the timeout is sent to the service along with request. It meant 'give me messages you already have, but don't wait'. // As we don't send timeout to service in AMQP, treating this as a special case and using a very short timeout - if(timeout == Duration.ZERO) - { + if (timeout == Duration.ZERO) { timeout = ZERO_TIMEOUT_APPROXIMATION; } Timer.schedule( - new Runnable() - { - public void run() - { - if( CoreMessageReceiver.this.pendingReceives.remove(receiveWorkItem)) - { - CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount()); - TRACE_LOGGER.info("No messages received from '{}'. Pending receive request timed out. Returning null to the client.", CoreMessageReceiver.this.receivePath); - AsyncUtil.completeFuture(receiveWorkItem.getWork(), null); - } - } - }, - timeout, - TimerType.OneTimeRun); + () -> { + if (CoreMessageReceiver.this.pendingReceives.remove(receiveWorkItem)) { + CoreMessageReceiver.this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount()); + TRACE_LOGGER.info("No messages received from '{}'. Pending receive request timed out. Returning null to the client.", CoreMessageReceiver.this.receivePath); + AsyncUtil.completeFuture(receiveWorkItem.getWork(), null); + } + }, + timeout, + TimerType.OneTimeRun); - this.ensureLinkIsOpen().thenRun(() -> {this.addCredit(receiveWorkItem);}); + this.ensureLinkIsOpen().thenRun(() -> this.addCredit(receiveWorkItem)); return onReceive; } @Override - public void onOpenComplete(Exception exception) - { - if (exception == null) - { + public void onOpenComplete(Exception exception) { + if (exception == null) { TRACE_LOGGER.info("Receive link to '{}' opened.", this.receivePath); - if(this.isSessionReceiver) - { - Map remoteSourceFilter = ((Source)this.receiveLink.getRemoteSource()).getFilter(); - if(remoteSourceFilter != null && remoteSourceFilter.containsKey(ClientConstants.SESSION_FILTER)) - { - String remoteSessionId = (String)remoteSourceFilter.get(ClientConstants.SESSION_FILTER); + if (this.isSessionReceiver) { + Map remoteSourceFilter = ((Source) this.receiveLink.getRemoteSource()).getFilter(); + if (remoteSourceFilter != null && remoteSourceFilter.containsKey(ClientConstants.SESSION_FILTER)) { + String remoteSessionId = (String) remoteSourceFilter.get(ClientConstants.SESSION_FILTER); this.sessionId = remoteSessionId; - if(this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.LOCKED_UNTIL_UTC)) - { + if (this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.LOCKED_UNTIL_UTC)) { this.sessionLockedUntilUtc = Util.convertDotNetTicksToInstant((long)this.receiveLink.getRemoteProperties().get(ClientConstants.LOCKED_UNTIL_UTC)); - } - else - { + } else { TRACE_LOGGER.warn("Accepted a session with id '{}', from '{}' which didn't set '{}' property on the receive link.", this.sessionId, this.receivePath, ClientConstants.LOCKED_UNTIL_UTC); this.sessionLockedUntilUtc = Instant.ofEpochMilli(0); } TRACE_LOGGER.info("Accepted session with id '{}', lockedUntilUtc '{}' from '{}'.", this.sessionId, this.sessionLockedUntilUtc, this.receivePath); - } - else - { + } else { exception = new ServiceBusException(false, "SessionId filter not set on the remote source."); } } } - if (exception == null) - { - if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) - { + if (exception == null) { + if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) { AsyncUtil.completeFuture(this.linkOpen.getWork(), this); } - if(this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) - { + if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) { AsyncUtil.completeFuture(this.receiveLinkReopenFuture, null); } @@ -632,20 +536,16 @@ public void onOpenComplete(Exception exception) TRACE_LOGGER.debug("receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}", this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount); - } - else - { + } else { this.cancelSASTokenRenewTimer(); - if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) - { + if (this.linkOpen != null && !this.linkOpen.getWork().isDone()) { TRACE_LOGGER.error("Opening receive link '{}' to '{}' failed.", this.receiveLink.getName(), this.receivePath, exception); this.setClosed(); ExceptionUtil.completeExceptionally(this.linkOpen.getWork(), exception, this, true); } - if(this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) - { + if (this.receiveLinkReopenFuture != null && !this.receiveLinkReopenFuture.isDone()) { TRACE_LOGGER.warn("Opening receive link '{}' to '{}' failed.", this.receiveLink.getName(), this.receivePath, exception); AsyncUtil.completeFutureExceptionally(this.receiveLinkReopenFuture, exception); } @@ -655,27 +555,21 @@ public void onOpenComplete(Exception exception) } @Override - public void onReceiveComplete(Delivery delivery) - { + public void onReceiveComplete(Delivery delivery) { this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId()); byte[] deliveryTag = delivery.getTag(); String deliveryTagAsString = StringUtil.convertBytesToString(delivery.getTag()); TRACE_LOGGER.debug("Received a delivery '{}' from '{}'", deliveryTagAsString, this.receivePath); - if(deliveryTag == null || deliveryTag.length == 0 || !this.tagsToDeliveriesMap.containsKey(deliveryTagAsString)) - { + if (deliveryTag == null || deliveryTag.length == 0 || !this.tagsToDeliveriesMap.containsKey(deliveryTagAsString)) { TRACE_LOGGER.debug("Received a message from '{}'. Adding to prefecthed messages.", this.receivePath); - try - { + try { Message message = Util.readMessageFromDelivery(receiveLink, delivery); - if(this.settleModePair.getSenderSettleMode() == SenderSettleMode.SETTLED) - { + if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.SETTLED) { // No op. Delivery comes settled from the sender delivery.disposition(Accepted.getInstance()); delivery.settle(); - } - else - { + } else { this.tagsToDeliveriesMap.put(StringUtil.convertBytesToString(delivery.getTag()), delivery); receiveLink.advance(); } @@ -683,161 +577,125 @@ public void onReceiveComplete(Delivery delivery) // Accuracy of count is not that important. So not making those two operations atomic this.currentPrefetechedMessagesCount.incrementAndGet(); this.prefetchedMessages.add(new MessageWithDeliveryTag(message, delivery.getTag())); - } - catch(Exception e) - { + } catch(Exception e) { TRACE_LOGGER.warn("Reading message from delivery '{}' from '{}', session '{}' failed with unexpected exception.", deliveryTagAsString, this.receivePath, this.sessionId, e); delivery.disposition(Released.getInstance()); delivery.settle(); return; } - } - else - { + } else { DeliveryState remoteState = delivery.getRemoteState(); TRACE_LOGGER.debug("Received a delivery '{}' with state '{}' from '{}'", deliveryTagAsString, remoteState, this.receivePath); Outcome remoteOutcome = null; if (remoteState instanceof Outcome) { - remoteOutcome = (Outcome)remoteState; + remoteOutcome = (Outcome) remoteState; } else if (remoteState instanceof TransactionalState) { - remoteOutcome = ((TransactionalState)remoteState).getOutcome(); + remoteOutcome = ((TransactionalState) remoteState).getOutcome(); } - if(remoteOutcome != null) - { + if (remoteOutcome != null) { UpdateStateWorkItem matchingUpdateStateWorkItem = this.pendingUpdateStateRequests.get(deliveryTagAsString); - if(matchingUpdateStateWorkItem != null) - { + if (matchingUpdateStateWorkItem != null) { DeliveryState matchingUpdateWorkItemDeliveryState = matchingUpdateStateWorkItem.getDeliveryState(); if (matchingUpdateWorkItemDeliveryState instanceof TransactionalState) { - matchingUpdateWorkItemDeliveryState = (DeliveryState)((TransactionalState) matchingUpdateWorkItemDeliveryState).getOutcome(); + matchingUpdateWorkItemDeliveryState = (DeliveryState) ((TransactionalState) matchingUpdateWorkItemDeliveryState).getOutcome(); } // This comparison is ugly. Using it for the lack of equals operation on Outcome classes - if(remoteOutcome.getClass().getName().equals(matchingUpdateWorkItemDeliveryState.getClass().getName())) - { + if (remoteOutcome.getClass().getName().equals(matchingUpdateWorkItemDeliveryState.getClass().getName())) { TRACE_LOGGER.debug("Completing a pending updateState operation for delivery '{}' from '{}'", deliveryTagAsString, this.receivePath); this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, null); - } - else - { -// if(matchingUpdateStateWorkItem.expectedOutcome instanceof Accepted) -// { + } else { +// if(matchingUpdateStateWorkItem.expectedOutcome instanceof Accepted) +// { TRACE_LOGGER.warn("Received delivery '{}' state '{}' doesn't match expected state '{}'", deliveryTagAsString, remoteState, matchingUpdateStateWorkItem.deliveryState); // Complete requests - if(remoteOutcome instanceof Rejected) - { + if (remoteOutcome instanceof Rejected) { Rejected rejected = (Rejected) remoteOutcome; ErrorCondition error = rejected.getError(); Exception exception = ExceptionUtil.toException(error); - if (ExceptionUtil.isGeneralError(error.getCondition())) - { + if (ExceptionUtil.isGeneralError(error.getCondition())) { this.lastKnownLinkError = exception; this.lastKnownErrorReportedAt = Instant.now(); } Duration retryInterval = this.retryPolicy.getNextRetryInterval(this.getClientId(), exception, matchingUpdateStateWorkItem.getTimeoutTracker().remaining()); - if (retryInterval == null) - { + if (retryInterval == null) { TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception); this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception); - } - else - { + } else { matchingUpdateStateWorkItem.setLastKnownException(exception); // Retry after retry interval TRACE_LOGGER.debug("Pending updateState operation for delivery '{}' will be retried after '{}'", deliveryTagAsString, retryInterval); try { this.underlyingFactory.scheduleOnReactorThread((int) retryInterval.toMillis(), new DeliveryStateDispatchHandler(delivery, matchingUpdateStateWorkItem.getDeliveryState())); - } - catch (IOException ioException) - { + } catch (IOException ioException) { this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, new ServiceBusException(false, "Operation failed while scheduling a retry on Reactor, see cause for more details.", ioException)); } } - } - else if (remoteOutcome instanceof Released) - { + } else if (remoteOutcome instanceof Released) { Exception exception = new OperationCancelledException(remoteOutcome.toString()); TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception); this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception); - } - else - { + } else { Exception exception = new ServiceBusException(false, remoteOutcome.toString()); TRACE_LOGGER.error("Completing pending updateState operation for delivery '{}' with exception", deliveryTagAsString, exception); this.completePendingUpdateStateWorkItem(delivery, deliveryTagAsString, matchingUpdateStateWorkItem, exception); } -// } +// } } - } - else - { + } else { // Should not happen. Ignore it } - } - else - { + } else { //Ignore it. we are only interested in terminal delivery states } } } @Override - public void onError(Exception exception) - { + public void onError(Exception exception) { this.creditToFlow.set(0); this.cancelSASTokenRenewTimer(); - if(this.settleModePair.getSenderSettleMode() == SenderSettleMode.UNSETTLED) - { + if (this.settleModePair.getSenderSettleMode() == SenderSettleMode.UNSETTLED) { this.prefetchedMessages.clear(); this.currentPrefetechedMessagesCount.set(0); this.tagsToDeliveriesMap.clear(); } - if (this.getIsClosingOrClosed()) - { + if (this.getIsClosingOrClosed()) { TRACE_LOGGER.info("Receive link to '{}', sessionId '{}' closed", this.receivePath, this.sessionId); AsyncUtil.completeFuture(this.linkClose, null); this.clearAllPendingWorkItems(exception); - } - else - { + } else { this.underlyingFactory.deregisterForConnectionError(this.receiveLink); TRACE_LOGGER.warn("Receive link '{}' to '{}', sessionId '{}' closed with error.", this.receiveLink.getName(), this.receivePath, this.sessionId, exception); this.lastKnownLinkError = exception; - if ((this.linkOpen != null && !this.linkOpen.getWork().isDone()) || - (this.receiveLinkReopenFuture !=null && !receiveLinkReopenFuture.isDone())) - { + if ((this.linkOpen != null && !this.linkOpen.getWork().isDone()) + || (this.receiveLinkReopenFuture != null && !receiveLinkReopenFuture.isDone())) { this.onOpenComplete(exception); } - if (exception != null && - (!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient())) - { + if (exception != null + && (!(exception instanceof ServiceBusException) || !((ServiceBusException) exception).getIsTransient())) { this.clearAllPendingWorkItems(exception); - if(this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException)) - { + if (this.isSessionReceiver && (exception instanceof SessionLockLostException || exception instanceof SessionCannotBeLockedException)) { // No point in retrying to establish a link.. SessionLock is lost TRACE_LOGGER.warn("SessionId '{}' lock lost. Closing receiver.", this.sessionId); this.isSessionLockLost = true; this.closeAsync(); } - } - else - { + } else { // TODO Why recreating link needs to wait for retry interval of pending receive? ReceiveWorkItem workItem = this.pendingReceives.peek(); - if (workItem != null && workItem.getTimeoutTracker() != null) - { + if (workItem != null && workItem.getTimeoutTracker() != null) { Duration nextRetryInterval = this.underlyingFactory.getRetryPolicy() .getNextRetryInterval(this.getClientId(), exception, workItem.getTimeoutTracker().remaining()); - if (nextRetryInterval != null) - { + if (nextRetryInterval != null) { TRACE_LOGGER.info("Receive link '{}' to '{}', sessionId '{}' will be reopened after '{}'", this.receiveLink.getName(), this.receivePath, this.sessionId, nextRetryInterval); Timer.schedule(() -> {CoreMessageReceiver.this.ensureLinkIsOpen();}, nextRetryInterval, TimerType.OneTimeRun); } @@ -846,39 +704,30 @@ public void onError(Exception exception) } } - private void reduceCreditForCompletedReceiveRequest(int maxCreditCountOfReceiveRequest) - { + private void reduceCreditForCompletedReceiveRequest(int maxCreditCountOfReceiveRequest) { this.creditNeededtoServePendingReceives.updateAndGet((c) -> { int updatedCredit = c - maxCreditCountOfReceiveRequest; return (updatedCredit > 0) ? updatedCredit : 0; }); } - private void addCredit(ReceiveWorkItem receiveWorkItem) - { + private void addCredit(ReceiveWorkItem receiveWorkItem) { // Timed out receive requests and batch receive requests completed with less than maxCount messages might have sent more credit // than consumed by the receiver resulting in excess credit at the service endpoint. int creditToFlowForWorkItem = this.creditNeededtoServePendingReceives.get() - (this.receiveLink.getCredit() + this.currentPrefetechedMessagesCount.get() + this.creditToFlow.get()) + this.prefetchCount; - if(creditToFlowForWorkItem > 0) - { + if (creditToFlowForWorkItem > 0) { int currentTotalCreditToSend = this.creditToFlow.addAndGet(creditToFlowForWorkItem); - if(currentTotalCreditToSend >= this.prefetchCount || currentTotalCreditToSend >= CREDIT_FLOW_BATCH_SIZE) - { - try - { - this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() - { + if (currentTotalCreditToSend >= this.prefetchCount || currentTotalCreditToSend >= CREDIT_FLOW_BATCH_SIZE) { + try { + this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { @Override - public void onEvent() - { + public void onEvent() { // Send credit accumulated so far to make it less chat-ty int accumulatedCredit = CoreMessageReceiver.this.creditToFlow.getAndSet(0); sendFlow(accumulatedCredit); } }); - } - catch (IOException ioException) - { + } catch (IOException ioException) { this.pendingReceives.remove(receiveWorkItem); this.reduceCreditForCompletedReceiveRequest(receiveWorkItem.getMaxMessageCount()); receiveWorkItem.getWork().completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException)); @@ -888,80 +737,61 @@ public void onEvent() } } - private void sendFlow(int credits) - { - if(!this.isBrowsableSession && credits > 0) - { + private void sendFlow(int credits) { + if (!this.isBrowsableSession && credits > 0) { this.receiveLink.flow(credits); TRACE_LOGGER.debug("Sent flow to the service. receiverPath:{}, linkname:{}, updated-link-credit:{}, sentCredits:{}", this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), credits); } } - private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) - { + private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) { // timer to signal a timeout if exceeds the operationTimeout on MessagingFactory Timer.schedule( - new Runnable() - { - public void run() - { - if (!linkOpen.getWork().isDone()) - { - CoreMessageReceiver.this.closeInternals(false); - CoreMessageReceiver.this.setClosed(); - - Exception operationTimedout = new TimeoutException( - String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()), - CoreMessageReceiver.this.lastKnownLinkError); - TRACE_LOGGER.warn(operationTimedout.getMessage()); - ExceptionUtil.completeExceptionally(linkOpen.getWork(), operationTimedout, CoreMessageReceiver.this, true); - } - } + () -> { + if (!linkOpen.getWork().isDone()) { + CoreMessageReceiver.this.closeInternals(false); + CoreMessageReceiver.this.setClosed(); + + Exception operationTimedout = new TimeoutException( + String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now()), + CoreMessageReceiver.this.lastKnownLinkError); + TRACE_LOGGER.warn(operationTimedout.getMessage()); + ExceptionUtil.completeExceptionally(linkOpen.getWork(), operationTimedout, CoreMessageReceiver.this, true); } - , timeout.remaining() - , TimerType.OneTimeRun); + }, + timeout.remaining(), + TimerType.OneTimeRun); } - private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) - { + private void scheduleLinkCloseTimeout(final TimeoutTracker timeout) { // timer to signal a timeout if exceeds the operationTimeout on MessagingFactory Timer.schedule( - new Runnable() - { - public void run() - { - if (!linkClose.isDone()) - { - Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", CoreMessageReceiver.this.receiveLink.getName(), ZonedDateTime.now())); - TRACE_LOGGER.warn(operationTimedout.getMessage()); + () -> { + if (!linkClose.isDone()) { + Exception operationTimedout = new TimeoutException(String.format(Locale.US, "%s operation on Receive Link(%s) timed out at %s", "Close", CoreMessageReceiver.this.receiveLink.getName(), ZonedDateTime.now())); + TRACE_LOGGER.warn(operationTimedout.getMessage()); - ExceptionUtil.completeExceptionally(linkClose, operationTimedout, CoreMessageReceiver.this, true); - } - } + ExceptionUtil.completeExceptionally(linkClose, operationTimedout, CoreMessageReceiver.this, true); } - , timeout.remaining() - , TimerType.OneTimeRun); + }, + timeout.remaining(), + TimerType.OneTimeRun); } @Override - public void onClose(ErrorCondition condition) - { - if (condition == null) - { + public void onClose(ErrorCondition condition) { + if (condition == null) { this.onError(new ServiceBusException(true, String.format(Locale.US, "Closing the link. LinkName(%s), EntityPath(%s)", this.receiveLink.getName(), this.receivePath))); - } - else - { + } else { Exception completionException = ExceptionUtil.toException(condition); this.onError(completionException); } } @Override - public ErrorContext getContext() - { + public ErrorContext getContext() { final boolean isLinkOpened = this.linkOpen != null && this.linkOpen.getWork().isDone(); final String referenceId = this.receiveLink != null && this.receiveLink.getRemoteProperties() != null && this.receiveLink.getRemoteProperties().containsKey(ClientConstants.TRACKING_ID_PROPERTY) ? this.receiveLink.getRemoteProperties().get(ClientConstants.TRACKING_ID_PROPERTY).toString() @@ -970,42 +800,34 @@ public ErrorContext getContext() ReceiverErrorContext errorContext = new ReceiverErrorContext(this.underlyingFactory != null ? this.underlyingFactory.getHostName() : null, this.receivePath, referenceId, - isLinkOpened ? this.prefetchCount : null, - isLinkOpened && this.receiveLink != null ? this.receiveLink.getCredit(): null, - this.currentPrefetechedMessagesCount.get()); + isLinkOpened ? this.prefetchCount : null, + isLinkOpened && this.receiveLink != null ? this.receiveLink.getCredit(): null, + this.currentPrefetechedMessagesCount.get()); return errorContext; } @Override - protected CompletableFuture onClose() - { + protected CompletableFuture onClose() { this.closeInternals(true); return this.linkClose; } - private void closeInternals(boolean waitForCloseCompletion) - { - if (!this.getIsClosed()) - { - if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) - { + private void closeInternals(boolean waitForCloseCompletion) { + if (!this.getIsClosed()) { + if (this.receiveLink != null && this.receiveLink.getLocalState() != EndpointState.CLOSED) { try { this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { @Override public void onEvent() { - if (CoreMessageReceiver.this.receiveLink != null && CoreMessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) - { + if (CoreMessageReceiver.this.receiveLink != null && CoreMessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED) { TRACE_LOGGER.info("Closing receive link to '{}'", CoreMessageReceiver.this.receivePath); CoreMessageReceiver.this.receiveLink.close(); CoreMessageReceiver.this.underlyingFactory.deregisterForConnectionError(CoreMessageReceiver.this.receiveLink); - if(waitForCloseCompletion) - { + if (waitForCloseCompletion) { CoreMessageReceiver.this.scheduleLinkCloseTimeout(TimeoutTracker.create(CoreMessageReceiver.this.operationTimeout)); - } - else - { + } else { AsyncUtil.completeFuture(CoreMessageReceiver.this.linkClose, null); } } @@ -1014,9 +836,7 @@ public void onEvent() { } catch (IOException e) { AsyncUtil.completeFutureExceptionally(this.linkClose, e); } - } - else - { + } else { AsyncUtil.completeFuture(this.linkClose, null); } @@ -1030,8 +850,7 @@ public void onEvent() { /* This is to be used for messages which are received on receiveLink. */ - public CompletableFuture completeMessageAsync(byte[] deliveryTag, TransactionContext transaction) - { + public CompletableFuture completeMessageAsync(byte[] deliveryTag, TransactionContext transaction) { Outcome outcome = Accepted.getInstance(); return this.updateMessageStateAsync(deliveryTag, outcome, transaction); } @@ -1039,8 +858,7 @@ public CompletableFuture completeMessageAsync(byte[] deliveryTag, Transact /* This is to be used for messages which are received on RequestResponseLink */ - public CompletableFuture completeMessageAsync(UUID lockToken, TransactionContext transaction) - { + public CompletableFuture completeMessageAsync(UUID lockToken, TransactionContext transaction) { return this.updateDispositionAsync( new UUID[]{lockToken}, ClientConstants.DISPOSITION_STATUS_COMPLETED, @@ -1050,40 +868,34 @@ public CompletableFuture completeMessageAsync(UUID lockToken, TransactionC transaction); } - public CompletableFuture abandonMessageAsync(byte[] deliveryTag, Map propertiesToModify, TransactionContext transaction) - { + public CompletableFuture abandonMessageAsync(byte[] deliveryTag, Map propertiesToModify, TransactionContext transaction) { Modified outcome = new Modified(); - if(propertiesToModify != null) - { + if (propertiesToModify != null) { outcome.setMessageAnnotations(propertiesToModify); } return this.updateMessageStateAsync(deliveryTag, outcome, transaction); } - public CompletableFuture abandonMessageAsync(UUID lockToken, Map propertiesToModify, TransactionContext transaction) - { + public CompletableFuture abandonMessageAsync(UUID lockToken, Map propertiesToModify, TransactionContext transaction) { return this.updateDispositionAsync( new UUID[]{lockToken}, ClientConstants.DISPOSITION_STATUS_ABANDONED, - null - , null, + null, + null, propertiesToModify, transaction); } - public CompletableFuture deferMessageAsync(byte[] deliveryTag, Map propertiesToModify, TransactionContext transaction) - { + public CompletableFuture deferMessageAsync(byte[] deliveryTag, Map propertiesToModify, TransactionContext transaction) { Modified outcome = new Modified(); outcome.setUndeliverableHere(true); - if(propertiesToModify != null) - { + if (propertiesToModify != null) { outcome.setMessageAnnotations(propertiesToModify); } return this.updateMessageStateAsync(deliveryTag, outcome, transaction); } - public CompletableFuture deferMessageAsync(UUID lockToken, Map propertiesToModify, TransactionContext transaction) - { + public CompletableFuture deferMessageAsync(UUID lockToken, Map propertiesToModify, TransactionContext transaction) { return this.updateDispositionAsync( new UUID[]{lockToken}, ClientConstants.DISPOSITION_STATUS_DEFERED, @@ -1098,21 +910,17 @@ public CompletableFuture deadLetterMessageAsync( String deadLetterReason, String deadLetterErrorDescription, Map propertiesToModify, - TransactionContext transaction) - { + TransactionContext transaction) { Rejected outcome = new Rejected(); ErrorCondition error = new ErrorCondition(ClientConstants.DEADLETTERNAME, null); - Map errorInfo = new HashMap(); - if(!StringUtil.isNullOrEmpty(deadLetterReason)) - { + Map errorInfo = new HashMap<>(); + if (!StringUtil.isNullOrEmpty(deadLetterReason)) { errorInfo.put(ClientConstants.DEADLETTER_REASON_HEADER, deadLetterReason); } - if(!StringUtil.isNullOrEmpty(deadLetterErrorDescription)) - { + if (!StringUtil.isNullOrEmpty(deadLetterErrorDescription)) { errorInfo.put(ClientConstants.DEADLETTER_ERROR_DESCRIPTION_HEADER, deadLetterErrorDescription); } - if(propertiesToModify != null) - { + if (propertiesToModify != null) { errorInfo.putAll(propertiesToModify); } error.setInfo(errorInfo); @@ -1126,8 +934,7 @@ public CompletableFuture deadLetterMessageAsync( String deadLetterReason, String deadLetterErrorDescription, Map propertiesToModify, - TransactionContext transaction) - { + TransactionContext transaction) { return this.updateDispositionAsync( new UUID[]{lockToken}, ClientConstants.DISPOSITION_STATUS_SUSPENDED, @@ -1137,21 +944,17 @@ public CompletableFuture deadLetterMessageAsync( transaction); } - private CompletableFuture updateMessageStateAsync(byte[] deliveryTag, Outcome outcome, TransactionContext transaction) - { + private CompletableFuture updateMessageStateAsync(byte[] deliveryTag, Outcome outcome, TransactionContext transaction) { this.throwIfInUnusableState(); - CompletableFuture completeMessageFuture = new CompletableFuture(); + CompletableFuture completeMessageFuture = new CompletableFuture<>(); String deliveryTagAsString = StringUtil.convertBytesToString(deliveryTag); TRACE_LOGGER.debug("Updating message state of delivery '{}' to '{}'", deliveryTagAsString, outcome); Delivery delivery = CoreMessageReceiver.this.tagsToDeliveriesMap.get(deliveryTagAsString); - if(delivery == null) - { + if (delivery == null) { TRACE_LOGGER.error("Delivery not found for delivery tag '{}'. Either receive link to '{}' closed with a transient error and reopened or the delivery was already settled by complete/abandon/defer/deadletter.", deliveryTagAsString, this.receivePath); completeMessageFuture.completeExceptionally(generateDeliveryNotFoundException()); - } - else - { + } else { DeliveryState state; if (transaction != TransactionContext.NULL_TXN) { state = new TransactionalState(); @@ -1165,12 +968,9 @@ private CompletableFuture updateMessageStateAsync(byte[] deliveryTag, Outc CoreMessageReceiver.this.pendingUpdateStateRequests.put(deliveryTagAsString, workItem); CoreMessageReceiver.this.ensureLinkIsOpen().thenRun(() -> { - try - { + try { this.underlyingFactory.scheduleOnReactorThread(new DeliveryStateDispatchHandler(delivery, state)); - } - catch (IOException ioException) - { + } catch (IOException ioException) { completeMessageFuture.completeExceptionally(generateDispatacherSchedulingFailedException("completeMessage", ioException)); } }); @@ -1179,56 +979,44 @@ private CompletableFuture updateMessageStateAsync(byte[] deliveryTag, Outc return completeMessageFuture; } - private synchronized CompletableFuture ensureLinkIsOpen() - { + private synchronized CompletableFuture ensureLinkIsOpen() { // Send SAS token before opening a link as connection might have been closed and reopened - if (!(this.receiveLink.getLocalState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE)) - { - if(this.receiveLinkReopenFuture == null || this.receiveLinkReopenFuture.isDone()) - { + if (!(this.receiveLink.getLocalState() == EndpointState.ACTIVE && this.receiveLink.getRemoteState() == EndpointState.ACTIVE)) { + if (this.receiveLinkReopenFuture == null || this.receiveLinkReopenFuture.isDone()) { TRACE_LOGGER.info("Recreating receive link to '{}'", this.receivePath); this.retryPolicy.incrementRetryCount(this.getClientId()); - this.receiveLinkReopenFuture = new CompletableFuture(); + this.receiveLinkReopenFuture = new CompletableFuture<>(); // Variable just to be closed over by the scheduled runnable. The runnable should cancel only the closed over future, not the parent's instance variable which can change final CompletableFuture linkReopenFutureThatCanBeCancelled = this.receiveLinkReopenFuture; Timer.schedule( - () -> { - if (!linkReopenFutureThatCanBeCancelled.isDone()) - { - CoreMessageReceiver.this.cancelSASTokenRenewTimer(); - Exception operationTimedout = new TimeoutException( - String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now())); - - TRACE_LOGGER.warn(operationTimedout.getMessage()); - AsyncUtil.completeFutureExceptionally(linkReopenFutureThatCanBeCancelled, operationTimedout); - } + () -> { + if (!linkReopenFutureThatCanBeCancelled.isDone()) { + CoreMessageReceiver.this.cancelSASTokenRenewTimer(); + Exception operationTimedout = new TimeoutException( + String.format(Locale.US, "%s operation on ReceiveLink(%s) to path(%s) timed out at %s.", "Open", CoreMessageReceiver.this.receiveLink.getName(), CoreMessageReceiver.this.receivePath, ZonedDateTime.now())); + + TRACE_LOGGER.warn(operationTimedout.getMessage()); + AsyncUtil.completeFutureExceptionally(linkReopenFutureThatCanBeCancelled, operationTimedout); } - , CoreMessageReceiver.LINK_REOPEN_TIMEOUT - , TimerType.OneTimeRun); + }, + CoreMessageReceiver.LINK_REOPEN_TIMEOUT, + TimerType.OneTimeRun); this.cancelSASTokenRenewTimer(); this.sendTokenAndSetRenewTimer(false).handleAsync((v, sendTokenEx) -> { - if(sendTokenEx != null) - { + if (sendTokenEx != null) { Throwable cause = ExceptionUtil.extractAsyncCompletionCause(sendTokenEx); TRACE_LOGGER.error("Sending SAS Token to '{}' failed.", this.receivePath, cause); this.receiveLinkReopenFuture.completeExceptionally(sendTokenEx); this.clearAllPendingWorkItems(sendTokenEx); - } - else - { - try - { - this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() - { + } else { + try { + this.underlyingFactory.scheduleOnReactorThread(new DispatchHandler() { @Override - public void onEvent() - { + public void onEvent() { CoreMessageReceiver.this.createReceiveLink(); } }); - } - catch (IOException ioEx) - { + } catch (IOException ioEx) { this.receiveLinkReopenFuture.completeExceptionally(ioEx); } } @@ -1237,26 +1025,20 @@ public void onEvent() } return this.receiveLinkReopenFuture; - } - else - { + } else { return CompletableFuture.completedFuture(null); } } - private void completePendingUpdateStateWorkItem(Delivery delivery, String deliveryTagAsString, UpdateStateWorkItem workItem, Exception exception) - { + private void completePendingUpdateStateWorkItem(Delivery delivery, String deliveryTagAsString, UpdateStateWorkItem workItem, Exception exception) { boolean isSettled = delivery.remotelySettled(); if (isSettled) { delivery.settle(); } - if(exception == null) - { + if (exception == null) { AsyncUtil.completeFuture(workItem.getWork(), null); - } - else - { + } else { ExceptionUtil.completeExceptionally(workItem.getWork(), exception, this, true); } @@ -1266,80 +1048,65 @@ private void completePendingUpdateStateWorkItem(Delivery delivery, String delive } } - private void clearAllPendingWorkItems(Throwable exception) - { + private void clearAllPendingWorkItems(Throwable exception) { TRACE_LOGGER.info("Completeing all pending receive and updateState operation on the receiver to '{}'", this.receivePath); final boolean isTransientException = exception == null || (exception instanceof ServiceBusException && ((ServiceBusException) exception).getIsTransient()); Iterator pendingRecivesIterator = this.pendingReceives.iterator(); - while(pendingRecivesIterator.hasNext()) - { + while (pendingRecivesIterator.hasNext()) { ReceiveWorkItem workItem = pendingRecivesIterator.next(); pendingRecivesIterator.remove(); CompletableFuture> future = workItem.getWork(); workItem.cancelTimeoutTask(false); this.reduceCreditForCompletedReceiveRequest(workItem.getMaxMessageCount()); - if (isTransientException) - { + if (isTransientException) { AsyncUtil.completeFuture(future, null); - } - else - { + } else { ExceptionUtil.completeExceptionally(future, exception, this, true); } } - for(Map.Entry pendingUpdate : this.pendingUpdateStateRequests.entrySet()) - { + for (Map.Entry pendingUpdate : this.pendingUpdateStateRequests.entrySet()) { pendingUpdateStateRequests.remove(pendingUpdate.getKey()); ExceptionUtil.completeExceptionally(pendingUpdate.getValue().getWork(), exception, this, true); } } - private static IllegalArgumentException generateDeliveryNotFoundException() - { + private static IllegalArgumentException generateDeliveryNotFoundException() { return new IllegalArgumentException("Delivery not found on the receive link."); } - private static ServiceBusException generateDispatacherSchedulingFailedException(String operation, Exception cause) - { + private static ServiceBusException generateDispatacherSchedulingFailedException(String operation, Exception cause) { return new ServiceBusException(false, operation + " failed while dispatching to Reactor, see cause for more details.", cause); } - public CompletableFuture> renewMessageLocksAsync(UUID[] lockTokens) - { + public CompletableFuture> renewMessageLocksAsync(UUID[] lockTokens) { this.throwIfInUnusableState(); - if(TRACE_LOGGER.isDebugEnabled()) - { + if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug("Renewing message locks for lock tokens '{}' of entity '{}', sesion '{}'", Arrays.toString(lockTokens), this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""); } return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> { HashMap requestBodyMap = new HashMap(); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, lockTokens); - if(this.isSessionReceiver) - { + if (this.isSessionReceiver) { requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); } Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RENEWLOCK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout); return responseFuture.thenComposeAsync((responseMessage) -> { - CompletableFuture> returningFuture = new CompletableFuture>(); + CompletableFuture> returningFuture = new CompletableFuture<>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) - { - if(TRACE_LOGGER.isDebugEnabled()) - { + if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { + if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug("Message locks for lock tokens '{}' renewed", Arrays.toString(lockTokens)); } Date[] expirations = (Date[])RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_EXPIRATIONS); returningFuture.complete(Arrays.stream(expirations).map((d) -> d.toInstant()).collect(Collectors.toList())); - } - else - { + } else { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); TRACE_LOGGER.error("Renewing message locks for lock tokens '{}' on entity '{}' failed", Arrays.toString(lockTokens), this.receivePath, failureException); @@ -1350,50 +1117,40 @@ public CompletableFuture> renewMessageLocksAsync(UUID[] lock }, MessagingFactory.INTERNAL_THREAD_POOL); } - public CompletableFuture> receiveDeferredMessageBatchAsync(Long[] sequenceNumbers) - { + public CompletableFuture> receiveDeferredMessageBatchAsync(Long[] sequenceNumbers) { this.throwIfInUnusableState(); - if(TRACE_LOGGER.isDebugEnabled()) - { + if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug("Receiving messages for sequence numbers '{}' from entity '{}', sesion '{}'", Arrays.toString(sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""); } return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> { HashMap requestBodyMap = new HashMap(); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SEQUENCE_NUMBERS, sequenceNumbers); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_RECEIVER_SETTLE_MODE, UnsignedInteger.valueOf(this.settleModePair.getReceiverSettleMode() == ReceiverSettleMode.FIRST ? 0 : 1)); - if(this.isSessionReceiver) - { + if (this.isSessionReceiver) { requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); } Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RECEIVE_BY_SEQUENCE_NUMBER, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout); return responseFuture.thenComposeAsync((responseMessage) -> { - CompletableFuture> returningFuture = new CompletableFuture>(); + CompletableFuture> returningFuture = new CompletableFuture<>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) - { - if(TRACE_LOGGER.isDebugEnabled()) - { + if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { + if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug("Received messges for sequence numbers '{}' from entity '{}', sesion '{}'", Arrays.toString(sequenceNumbers), this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""); } - List receivedMessages = new ArrayList(); + List receivedMessages = new ArrayList<>(); Object responseBodyMap = ((AmqpValue)responseMessage.getBody()).getValue(); - if(responseBodyMap != null && responseBodyMap instanceof Map) - { + if (responseBodyMap != null && responseBodyMap instanceof Map) { Object messages = ((Map)responseBodyMap).get(ClientConstants.REQUEST_RESPONSE_MESSAGES); - if(messages != null && messages instanceof Iterable) - { - for(Object message : (Iterable)messages) - { - if(message instanceof Map) - { + if (messages != null && messages instanceof Iterable) { + for (Object message : (Iterable)messages) { + if (message instanceof Map) { Message receivedMessage = Message.Factory.create(); Binary messagePayLoad = (Binary)((Map)message).get(ClientConstants.REQUEST_RESPONSE_MESSAGE); receivedMessage.decode(messagePayLoad.getArray(), messagePayLoad.getArrayOffset(), messagePayLoad.getLength()); UUID lockToken = ClientConstants.ZEROLOCKTOKEN; - if(((Map)message).containsKey(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN)) - { + if (((Map)message).containsKey(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN)) { lockToken = (UUID)((Map)message).get(ClientConstants.REQUEST_RESPONSE_LOCKTOKEN); } @@ -1403,9 +1160,7 @@ public CompletableFuture> receiveDeferredMessag } } returningFuture.complete(receivedMessages); - } - else - { + } else { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); TRACE_LOGGER.error("Receiving messages by sequence numbers '{}' from entity '{}' failed", Arrays.toString(sequenceNumbers), this.receivePath, failureException); @@ -1422,11 +1177,9 @@ public CompletableFuture updateDispositionAsync( String deadLetterReason, String deadLetterErrorDescription, Map propertiesToModify, - TransactionContext transaction) - { + TransactionContext transaction) { this.throwIfInUnusableState(); - if(TRACE_LOGGER.isDebugEnabled()) - { + if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}'", Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""); } return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> { @@ -1434,41 +1187,33 @@ public CompletableFuture updateDispositionAsync( requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_LOCKTOKENS, lockTokens); requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DISPOSITION_STATUS, dispositionStatus); - if(deadLetterReason != null) - { + if (deadLetterReason != null) { requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_REASON, deadLetterReason); } - if(deadLetterErrorDescription != null) - { + if (deadLetterErrorDescription != null) { requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_DEADLETTER_DESCRIPTION, deadLetterErrorDescription); } - if(propertiesToModify != null && propertiesToModify.size() > 0) - { + if (propertiesToModify != null && propertiesToModify.size() > 0) { requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_PROPERTIES_TO_MODIFY, propertiesToModify); } - if(this.isSessionReceiver) - { + if (this.isSessionReceiver) { requestBodyMap.put(ClientConstants.REQUEST_RESPONSE_SESSIONID, this.getSessionId()); } Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_UPDATE_DISPOSTION_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, transaction, this.operationTimeout); return responseFuture.thenComposeAsync((responseMessage) -> { - CompletableFuture returningFuture = new CompletableFuture(); + CompletableFuture returningFuture = new CompletableFuture<>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) - { - if(TRACE_LOGGER.isDebugEnabled()) - { + if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { + if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug("Update disposition of deliveries '{}' to '{}' on entity '{}', sesion '{}' succeeded.", Arrays.toString(lockTokens), dispositionStatus, this.receivePath, this.isSessionReceiver ? this.getSessionId() : ""); } returningFuture.complete(null); - } - else - { + } else { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); TRACE_LOGGER.error("Update disposition on entity '{}' failed", this.receivePath, failureException); @@ -1479,8 +1224,7 @@ public CompletableFuture updateDispositionAsync( }, MessagingFactory.INTERNAL_THREAD_POOL); } - public CompletableFuture renewSessionLocksAsync() - { + public CompletableFuture renewSessionLocksAsync() { this.throwIfInUnusableState(); TRACE_LOGGER.debug("Renewing session lock on entity '{}' of sesion '{}'", this.receivePath, this.getSessionId()); return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> { @@ -1490,17 +1234,14 @@ public CompletableFuture renewSessionLocksAsync() Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_RENEW_SESSIONLOCK_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout); return responseFuture.thenComposeAsync((responseMessage) -> { - CompletableFuture returningFuture = new CompletableFuture(); + CompletableFuture returningFuture = new CompletableFuture<>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) - { + if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { Date expiration = (Date)RequestResponseUtils.getResponseBody(responseMessage).get(ClientConstants.REQUEST_RESPONSE_EXPIRATION); this.sessionLockedUntilUtc = expiration.toInstant(); TRACE_LOGGER.debug("Session lock on entity '{}' of sesion '{}' renewed until '{}'", this.receivePath, this.getSessionId(), this.sessionLockedUntilUtc); returningFuture.complete(null); - } - else - { + } else { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); TRACE_LOGGER.error("Renewing session lock on entity '{}' of sesion '{}' failed", this.receivePath, this.getSessionId(), failureException); @@ -1511,8 +1252,7 @@ public CompletableFuture renewSessionLocksAsync() }, MessagingFactory.INTERNAL_THREAD_POOL); } - public CompletableFuture getSessionStateAsync() - { + public CompletableFuture getSessionStateAsync() { this.throwIfInUnusableState(); TRACE_LOGGER.debug("Getting session state of sesion '{}' from entity '{}'", this.getSessionId(), this.receivePath); return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> { @@ -1522,26 +1262,21 @@ public CompletableFuture getSessionStateAsync() Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_GET_SESSION_STATE_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout); return responseFuture.thenComposeAsync((responseMessage) -> { - CompletableFuture returningFuture = new CompletableFuture(); + CompletableFuture returningFuture = new CompletableFuture<>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) - { + if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { TRACE_LOGGER.debug("Got session state of sesion '{}' from entity '{}'", this.getSessionId(), this.receivePath); byte[] receivedState = null; Map bodyMap = RequestResponseUtils.getResponseBody(responseMessage); - if(bodyMap.containsKey(ClientConstants.REQUEST_RESPONSE_SESSION_STATE)) - { + if (bodyMap.containsKey(ClientConstants.REQUEST_RESPONSE_SESSION_STATE)) { Object sessionState = bodyMap.get(ClientConstants.REQUEST_RESPONSE_SESSION_STATE); - if(sessionState != null) - { + if (sessionState != null) { receivedState = ((Binary)sessionState).getArray(); } } returningFuture.complete(receivedState); - } - else - { + } else { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); TRACE_LOGGER.error("Getting session state of sesion '{}' from entity '{}' failed", this.getSessionId(), this.receivePath, failureException); @@ -1553,8 +1288,7 @@ public CompletableFuture getSessionStateAsync() } // NULL session state is allowed - public CompletableFuture setSessionStateAsync(byte[] sessionState) - { + public CompletableFuture setSessionStateAsync(byte[] sessionState) { this.throwIfInUnusableState(); TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}'", this.getSessionId(), this.receivePath); return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> { @@ -1565,15 +1299,12 @@ public CompletableFuture setSessionStateAsync(byte[] sessionState) Message requestMessage = RequestResponseUtils.createRequestMessageFromPropertyBag(ClientConstants.REQUEST_RESPONSE_SET_SESSION_STATE_OPERATION, requestBodyMap, Util.adjustServerTimeout(this.operationTimeout), this.receiveLink.getName()); CompletableFuture responseFuture = this.requestResponseLink.requestAysnc(requestMessage, TransactionContext.NULL_TXN, this.operationTimeout); return responseFuture.thenComposeAsync((responseMessage) -> { - CompletableFuture returningFuture = new CompletableFuture(); + CompletableFuture returningFuture = new CompletableFuture<>(); int statusCode = RequestResponseUtils.getResponseStatusCode(responseMessage); - if(statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) - { + if (statusCode == ClientConstants.REQUEST_RESPONSE_OK_STATUS_CODE) { TRACE_LOGGER.debug("Setting session state of sesion '{}' on entity '{}' succeeded", this.getSessionId(), this.receivePath); returningFuture.complete(null); - } - else - { + } else { // error response Exception failureException = RequestResponseUtils.genereateExceptionFromResponse(responseMessage); TRACE_LOGGER.error("Setting session state of sesion '{}' on entity '{}' failed", this.getSessionId(), this.receivePath, failureException); @@ -1585,12 +1316,11 @@ public CompletableFuture setSessionStateAsync(byte[] sessionState) } // A receiver can be used to peek messages from any session-id, useful for browsable sessions - public CompletableFuture> peekMessagesAsync(long fromSequenceNumber, int messageCount, String sessionId) - { + public CompletableFuture> peekMessagesAsync(long fromSequenceNumber, int messageCount, String sessionId) { this.throwIfInUnusableState(); - return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> { - return CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, sessionId, this.receiveLink.getName()); - }, MessagingFactory.INTERNAL_THREAD_POOL); + return this.createRequestResponseLinkAsync().thenComposeAsync((v) -> + CommonRequestResponseOperations.peekMessagesAsync(this.requestResponseLink, this.operationTimeout, fromSequenceNumber, messageCount, sessionId, this.receiveLink.getName()), + MessagingFactory.INTERNAL_THREAD_POOL); } private static class DeliveryStateDispatchHandler extends DispatchHandler {