diff --git a/eng/jacoco-test-coverage/pom.xml b/eng/jacoco-test-coverage/pom.xml
index 73828a1aa49e..1a7cad4acdb4 100644
--- a/eng/jacoco-test-coverage/pom.xml
+++ b/eng/jacoco-test-coverage/pom.xml
@@ -109,7 +109,7 @@
com.azure
azure-core-amqp
- 2.1.0-beta.1
+ 2.2.0
com.azure
diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt
index 9e5709f9db1e..9d429f229c67 100644
--- a/eng/versioning/version_client.txt
+++ b/eng/versioning/version_client.txt
@@ -58,7 +58,7 @@ com.azure:azure-communication-identity;1.1.0;1.2.0-beta.1
com.azure:azure-communication-phonenumbers;1.0.1;1.1.0-beta.1
com.azure:azure-containers-containerregistry;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-core;1.16.0;1.17.0
-com.azure:azure-core-amqp;2.0.6;2.1.0-beta.1
+com.azure:azure-core-amqp;2.0.6;2.2.0
com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-experimental;1.0.0-beta.13;1.0.0-beta.14
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
diff --git a/sdk/core/azure-core-amqp-experimental/pom.xml b/sdk/core/azure-core-amqp-experimental/pom.xml
index 551613c7334a..40efe856cd7f 100644
--- a/sdk/core/azure-core-amqp-experimental/pom.xml
+++ b/sdk/core/azure-core-amqp-experimental/pom.xml
@@ -58,7 +58,7 @@
com.azure
azure-core-amqp
- 2.1.0-beta.1
+ 2.2.0
diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md
index 6eba1b8a641a..9e302e6c43b8 100644
--- a/sdk/core/azure-core-amqp/CHANGELOG.md
+++ b/sdk/core/azure-core-amqp/CHANGELOG.md
@@ -1,15 +1,11 @@
# Release History
-## 2.1.0-beta.1 (Unreleased)
-
+## 2.2.0 (2021-06-10)
### New Features
-- Exposing CbsAuthorizationType.
-- Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker.
-- AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable.
-- Delivery outcomes and delivery states are added.
+- Adding support for AMQP data types SEQUENCE and VALUE.
-### Bug Fixes
-- Fixed a bug where connection and sessions would not be disposed when their endpoint closed.
+### Dependency Updates
+- Upgraded `azure-core` dependency to `1.15.0`.
## 2.0.6 (2021-05-24)
### Bug Fixes
@@ -22,13 +18,6 @@
- Upgraded `azure-core` from `1.15.0` to `1.16.0`.
- Upgraded Reactor from `3.4.3` to `3.4.5`.
-## 2.2.0-beta.1 (2021-04-14)
-### New Features
-- Adding support for AMQP data types SEQUENCE and VALUE.
-
-### Dependency Updates
-- Upgraded `azure-core` dependency to `1.15.0`.
-
## 2.0.4 (2021-04-12)
### Bug Fixes
diff --git a/sdk/core/azure-core-amqp/pom.xml b/sdk/core/azure-core-amqp/pom.xml
index 5b1572cc945f..0aaac0644b42 100644
--- a/sdk/core/azure-core-amqp/pom.xml
+++ b/sdk/core/azure-core-amqp/pom.xml
@@ -14,7 +14,7 @@
com.azure
azure-core-amqp
- 2.1.0-beta.1
+ 2.2.0
jar
Microsoft Azure Java Core AMQP Library
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java
index f469f879dede..04145e4ece7b 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java
@@ -116,14 +116,13 @@ private Mono isAuthorized() {
.next()
.switchIfEmpty(Mono.error(new AmqpException(false, "Did not get response from tokenManager: " + entityPath, getErrorContext())))
.handle((response, sink) -> {
- if (response != AmqpResponseCode.ACCEPTED && response != AmqpResponseCode.OK) {
+ if (RequestResponseUtils.isSuccessful(response)) {
+ sink.complete();
+ } else {
final String message = String.format("User does not have authorization to perform operation "
+ "on entity [%s]. Response: [%s]", entityPath, response);
sink.error(ExceptionUtil.amqpResponseCodeToException(response.getValue(), message,
getErrorContext()));
-
- } else {
- sink.complete();
}
});
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
index 4e9bf0b850a1..53d5f7a24135 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java
@@ -464,17 +464,12 @@ private synchronized void closeConnectionWork() {
}
connection.close();
+ handler.close();
final ArrayList> closingSessions = new ArrayList<>();
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));
- final Mono closedExecutor;
- if (executor != null) {
- closedExecutor = executor.isClosed();
- executor.close();
- } else {
- closedExecutor = Mono.empty();
- }
+ final Mono closedExecutor = executor != null ? executor.closeAsync() : Mono.empty();
// Close all the children.
final Mono closeSessionsMono = Mono.when(closingSessions)
@@ -491,7 +486,6 @@ private synchronized void closeConnectionWork() {
return false;
});
- handler.close();
subscriptions.dispose();
}));
@@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException {
final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();
- reactorProvider.getReactorDispatcher().getShutdownSignal()
- .subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal),
- error -> reactorExceptionHandler.onConnectionError(error));
-
// Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe.
// Using Schedulers.single() will use the same thread for all connections in this process which
// limits the scalability of the no. of concurrent connections a single process can have.
@@ -539,6 +529,22 @@ private synchronized Connection getOrCreateConnection() throws IOException {
reactorExceptionHandler, pendingTasksDuration,
connectionOptions.getFullyQualifiedNamespace());
+ // To avoid inconsistent synchronization of executor, we set this field with the closeAsync method.
+ // It will not be kicked off until subscribed to.
+ final Mono executorCloseMono = executor.closeAsync();
+ reactorProvider.getReactorDispatcher().getShutdownSignal()
+ .flatMap(signal -> {
+ logger.info("Shutdown signal received from reactor provider.");
+ reactorExceptionHandler.onConnectionShutdown(signal);
+ return executorCloseMono;
+ })
+ .onErrorResume(error -> {
+ logger.info("Error received from reactor provider.", error);
+ reactorExceptionHandler.onConnectionError(error);
+ return executorCloseMono;
+ })
+ .subscribe();
+
executor.start();
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java
index 834477d827db..7b47dbc5c4e6 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java
@@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.HandlerException;
@@ -14,7 +15,6 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;
-import java.io.Closeable;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
@@ -23,7 +23,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-class ReactorExecutor implements Closeable {
+/**
+ * Schedules the proton-j reactor to continuously run work.
+ */
+class ReactorExecutor implements AsyncCloseable {
private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";
private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
@@ -51,7 +54,7 @@ class ReactorExecutor implements Closeable {
/**
* Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link
- * #close()} is called.
+ * #closeAsync()} is called.
*/
void start() {
if (hasStarted.getAndSet(true)) {
@@ -142,10 +145,6 @@ private void run() {
}
}
- Mono isClosed() {
- return isClosedMono.asMono();
- }
-
/**
* Schedules the release of the current reactor after operation timeout has elapsed.
*/
@@ -175,26 +174,27 @@ private void scheduleCompletePendingTasks() {
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
}
- @Override
- public void close() {
- if (isDisposed.getAndSet(true)) {
- return;
- }
-
- if (hasStarted.get()) {
- scheduleCompletePendingTasks();
- }
- }
-
private void close(String reason) {
logger.verbose("Completing close and disposing scheduler. {}", reason);
-
+ scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
- scheduler.dispose();
+ }
+
+ @Override
+ public Mono closeAsync() {
+ if (isDisposed.getAndSet(true)) {
+ return isClosedMono.asMono();
+ }
+
+ if (hasStarted.get()) {
+ scheduleCompletePendingTasks();
+ }
+
+ return isClosedMono.asMono();
}
}
diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseUtils.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseUtils.java
index d61b8594b9de..d0c2fead82a1 100644
--- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseUtils.java
+++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseUtils.java
@@ -28,6 +28,10 @@ public class RequestResponseUtils {
public static boolean isSuccessful(Message message) {
final AmqpResponseCode statusCode = getStatusCode(message);
+ return isSuccessful(statusCode);
+ }
+
+ public static boolean isSuccessful(AmqpResponseCode statusCode) {
return statusCode == AmqpResponseCode.OK || statusCode == AmqpResponseCode.ACCEPTED;
}
diff --git a/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java b/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java
index a37e5b9a10c6..31c426234899 100644
--- a/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java
+++ b/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java
@@ -3,6 +3,9 @@
package com.azure.core.amqp.models;
+import org.junit.jupiter.api.Test;
+import java.nio.charset.StandardCharsets;
+
/**
* Class contains sample code snippets that will be used in javadocs.
*/
@@ -10,8 +13,10 @@ public class AmqpAnnotatedMessageJavaDocCodeSamples {
/**
* Get message body from {@link AmqpAnnotatedMessage}.
*/
+ @Test
public void checkBodyType() {
- AmqpAnnotatedMessage amqpAnnotatedMessage = null;
+ AmqpAnnotatedMessage amqpAnnotatedMessage =
+ new AmqpAnnotatedMessage(AmqpMessageBody.fromData("my-amqp-message".getBytes(StandardCharsets.UTF_8)));
// BEGIN: com.azure.core.amqp.models.AmqpBodyType.checkBodyType
// If client do not check `AmqpMessageBody.getBodyType()` and payload is not of type `AmqpMessageBodyType.DATA`,
// calling `getFirstData()` or `getData()` on `AmqpMessageBody` will throw Runtime exception.
diff --git a/sdk/core/pom.xml b/sdk/core/pom.xml
index 4bc74600871f..daa5c2b145c3 100644
--- a/sdk/core/pom.xml
+++ b/sdk/core/pom.xml
@@ -37,7 +37,7 @@
com.azure
azure-core-amqp
- 2.1.0-beta.1
+ 2.2.0
com.azure
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java
index 9fd59529d17e..254e37b16e66 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java
@@ -162,14 +162,14 @@ private Mono getProperties(Map properties, Class respo
.handle((message, sink) -> {
if (RequestResponseUtils.isSuccessful(message)) {
sink.next(messageSerializer.deserialize(message, responseType));
- }
-
- final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
- final String statusDescription = RequestResponseUtils.getStatusDescription(message);
- final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(),
- statusDescription, channel.getErrorContext());
+ } else {
+ final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
+ final String statusDescription = RequestResponseUtils.getStatusDescription(message);
+ final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(),
+ statusDescription, channel.getErrorContext());
- sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error)));
+ sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error)));
+ }
}));
});
}