From a90b19e83aeccc0405f9c9043ac52f9efa042773 Mon Sep 17 00:00:00 2001 From: Connie Yau Date: Mon, 7 Jun 2021 23:56:17 -0700 Subject: [PATCH 1/4] Fix Management Bug (#22122) * Adding missing return statement. * Using common logic for status codes. * Adding isSuccessful. --- .../amqp/implementation/ManagementChannel.java | 7 +++---- .../amqp/implementation/RequestResponseUtils.java | 4 ++++ .../implementation/ManagementChannel.java | 14 +++++++------- 3 files changed, 14 insertions(+), 11 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java index f469f879dede..04145e4ece7b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ManagementChannel.java @@ -116,14 +116,13 @@ private Mono isAuthorized() { .next() .switchIfEmpty(Mono.error(new AmqpException(false, "Did not get response from tokenManager: " + entityPath, getErrorContext()))) .handle((response, sink) -> { - if (response != AmqpResponseCode.ACCEPTED && response != AmqpResponseCode.OK) { + if (RequestResponseUtils.isSuccessful(response)) { + sink.complete(); + } else { final String message = String.format("User does not have authorization to perform operation " + "on entity [%s]. Response: [%s]", entityPath, response); sink.error(ExceptionUtil.amqpResponseCodeToException(response.getValue(), message, getErrorContext())); - - } else { - sink.complete(); } }); } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseUtils.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseUtils.java index d61b8594b9de..d0c2fead82a1 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseUtils.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseUtils.java @@ -28,6 +28,10 @@ public class RequestResponseUtils { public static boolean isSuccessful(Message message) { final AmqpResponseCode statusCode = getStatusCode(message); + return isSuccessful(statusCode); + } + + public static boolean isSuccessful(AmqpResponseCode statusCode) { return statusCode == AmqpResponseCode.OK || statusCode == AmqpResponseCode.ACCEPTED; } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java index 9fd59529d17e..254e37b16e66 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java @@ -162,14 +162,14 @@ private Mono getProperties(Map properties, Class respo .handle((message, sink) -> { if (RequestResponseUtils.isSuccessful(message)) { sink.next(messageSerializer.deserialize(message, responseType)); - } - - final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message); - final String statusDescription = RequestResponseUtils.getStatusDescription(message); - final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(), - statusDescription, channel.getErrorContext()); + } else { + final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message); + final String statusDescription = RequestResponseUtils.getStatusDescription(message); + final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(), + statusDescription, channel.getErrorContext()); - sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error))); + sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error))); + } })); }); } From d33d106022a84daf7df295a0463e6bc0903ee949 Mon Sep 17 00:00:00 2001 From: Connie Yau Date: Thu, 10 Jun 2021 10:46:46 -0700 Subject: [PATCH 2/4] Closing reactor executor when IO pipe is closed. (#22192) * When IO sink is disposed of, close the corresponding executor. * Update ReactorExecutor to use AsyncCloseable. * Removing unused method. * Add changelog entry. --- sdk/core/azure-core-amqp/CHANGELOG.md | 9 +++++ .../implementation/ReactorConnection.java | 30 ++++++++------ .../amqp/implementation/ReactorExecutor.java | 40 +++++++++---------- 3 files changed, 47 insertions(+), 32 deletions(-) diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 6eba1b8a641a..25ab4caa1013 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -3,13 +3,22 @@ ## 2.1.0-beta.1 (Unreleased) ### New Features + - Exposing CbsAuthorizationType. - Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker. - AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable. - Delivery outcomes and delivery states are added. ### Bug Fixes + - Fixed a bug where connection and sessions would not be disposed when their endpoint closed. +- Fixed a bug where ReactorExecutor did not dispose of its scheduler when "IO Sink was interrupted". + +### Dependency Updates + +- Upgraded `azure-core` from `1.16.0` to `1.17.0`. +- Upgraded `proton-j` from `0.33.4` to `0.33.8`. +- Upgraded `qpid-proton-j-extensions` from `1.2.3` to `1.2.4`. ## 2.0.6 (2021-05-24) ### Bug Fixes diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 4e9bf0b850a1..53d5f7a24135 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -464,17 +464,12 @@ private synchronized void closeConnectionWork() { } connection.close(); + handler.close(); final ArrayList> closingSessions = new ArrayList<>(); sessionMap.values().forEach(link -> closingSessions.add(link.isClosed())); - final Mono closedExecutor; - if (executor != null) { - closedExecutor = executor.isClosed(); - executor.close(); - } else { - closedExecutor = Mono.empty(); - } + final Mono closedExecutor = executor != null ? executor.closeAsync() : Mono.empty(); // Close all the children. final Mono closeSessionsMono = Mono.when(closingSessions) @@ -491,7 +486,6 @@ private synchronized void closeConnectionWork() { return false; }); - handler.close(); subscriptions.dispose(); })); @@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException { final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler(); - reactorProvider.getReactorDispatcher().getShutdownSignal() - .subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal), - error -> reactorExceptionHandler.onConnectionError(error)); - // Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe. // Using Schedulers.single() will use the same thread for all connections in this process which // limits the scalability of the no. of concurrent connections a single process can have. @@ -539,6 +529,22 @@ private synchronized Connection getOrCreateConnection() throws IOException { reactorExceptionHandler, pendingTasksDuration, connectionOptions.getFullyQualifiedNamespace()); + // To avoid inconsistent synchronization of executor, we set this field with the closeAsync method. + // It will not be kicked off until subscribed to. + final Mono executorCloseMono = executor.closeAsync(); + reactorProvider.getReactorDispatcher().getShutdownSignal() + .flatMap(signal -> { + logger.info("Shutdown signal received from reactor provider."); + reactorExceptionHandler.onConnectionShutdown(signal); + return executorCloseMono; + }) + .onErrorResume(error -> { + logger.info("Error received from reactor provider.", error); + reactorExceptionHandler.onConnectionError(error); + return executorCloseMono; + }) + .subscribe(); + executor.start(); } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java index 834477d827db..7b47dbc5c4e6 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.AmqpShutdownSignal; import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.engine.HandlerException; @@ -14,7 +15,6 @@ import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; -import java.io.Closeable; import java.nio.channels.UnresolvedAddressException; import java.time.Duration; import java.util.Locale; @@ -23,7 +23,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -class ReactorExecutor implements Closeable { +/** + * Schedules the proton-j reactor to continuously run work. + */ +class ReactorExecutor implements AsyncCloseable { private static final String LOG_MESSAGE = "connectionId[{}], message[{}]"; private final ClientLogger logger = new ClientLogger(ReactorExecutor.class); @@ -51,7 +54,7 @@ class ReactorExecutor implements Closeable { /** * Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link - * #close()} is called. + * #closeAsync()} is called. */ void start() { if (hasStarted.getAndSet(true)) { @@ -142,10 +145,6 @@ private void run() { } } - Mono isClosed() { - return isClosedMono.asMono(); - } - /** * Schedules the release of the current reactor after operation timeout has elapsed. */ @@ -175,26 +174,27 @@ private void scheduleCompletePendingTasks() { }, timeout.toMillis(), TimeUnit.MILLISECONDS); } - @Override - public void close() { - if (isDisposed.getAndSet(true)) { - return; - } - - if (hasStarted.get()) { - scheduleCompletePendingTasks(); - } - } - private void close(String reason) { logger.verbose("Completing close and disposing scheduler. {}", reason); - + scheduler.dispose(); isClosedMono.emitEmpty((signalType, emitResult) -> { logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType, emitResult); return false; }); exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason)); - scheduler.dispose(); + } + + @Override + public Mono closeAsync() { + if (isDisposed.getAndSet(true)) { + return isClosedMono.asMono(); + } + + if (hasStarted.get()) { + scheduleCompletePendingTasks(); + } + + return isClosedMono.asMono(); } } From ef3648e0780990bedb71be4b70ccb29fd95037c5 Mon Sep 17 00:00:00 2001 From: "Hong Li(MSFT)" <74638143+hongli750210@users.noreply.github.com> Date: Thu, 10 Jun 2021 02:02:09 +0800 Subject: [PATCH 3/4] Fix Azure Core Amqp Sample issue #18806 by lihong 202105271344 (#21885) --- .../models/AmqpAnnotatedMessageJavaDocCodeSamples.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java b/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java index a37e5b9a10c6..31c426234899 100644 --- a/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java +++ b/sdk/core/azure-core-amqp/src/samples/java/com/azure/core/amqp/models/AmqpAnnotatedMessageJavaDocCodeSamples.java @@ -3,6 +3,9 @@ package com.azure.core.amqp.models; +import org.junit.jupiter.api.Test; +import java.nio.charset.StandardCharsets; + /** * Class contains sample code snippets that will be used in javadocs. */ @@ -10,8 +13,10 @@ public class AmqpAnnotatedMessageJavaDocCodeSamples { /** * Get message body from {@link AmqpAnnotatedMessage}. */ + @Test public void checkBodyType() { - AmqpAnnotatedMessage amqpAnnotatedMessage = null; + AmqpAnnotatedMessage amqpAnnotatedMessage = + new AmqpAnnotatedMessage(AmqpMessageBody.fromData("my-amqp-message".getBytes(StandardCharsets.UTF_8))); // BEGIN: com.azure.core.amqp.models.AmqpBodyType.checkBodyType // If client do not check `AmqpMessageBody.getBodyType()` and payload is not of type `AmqpMessageBodyType.DATA`, // calling `getFirstData()` or `getData()` on `AmqpMessageBody` will throw Runtime exception. From 24acb2dd181da6ec73db6f9c4f99382d533f8847 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 10 Jun 2021 11:20:20 -0700 Subject: [PATCH 4/4] Running prepare-release script. --- eng/jacoco-test-coverage/pom.xml | 2 +- eng/versioning/version_client.txt | 2 +- sdk/core/azure-core-amqp-experimental/pom.xml | 2 +- sdk/core/azure-core-amqp/CHANGELOG.md | 26 +++---------------- sdk/core/azure-core-amqp/pom.xml | 2 +- sdk/core/pom.xml | 2 +- 6 files changed, 8 insertions(+), 28 deletions(-) diff --git a/eng/jacoco-test-coverage/pom.xml b/eng/jacoco-test-coverage/pom.xml index 73828a1aa49e..1a7cad4acdb4 100644 --- a/eng/jacoco-test-coverage/pom.xml +++ b/eng/jacoco-test-coverage/pom.xml @@ -109,7 +109,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.2.0 com.azure diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index 9e5709f9db1e..9d429f229c67 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -58,7 +58,7 @@ com.azure:azure-communication-identity;1.1.0;1.2.0-beta.1 com.azure:azure-communication-phonenumbers;1.0.1;1.1.0-beta.1 com.azure:azure-containers-containerregistry;1.0.0-beta.2;1.0.0-beta.3 com.azure:azure-core;1.16.0;1.17.0 -com.azure:azure-core-amqp;2.0.6;2.1.0-beta.1 +com.azure:azure-core-amqp;2.0.6;2.2.0 com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1 com.azure:azure-core-experimental;1.0.0-beta.13;1.0.0-beta.14 com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1 diff --git a/sdk/core/azure-core-amqp-experimental/pom.xml b/sdk/core/azure-core-amqp-experimental/pom.xml index 551613c7334a..40efe856cd7f 100644 --- a/sdk/core/azure-core-amqp-experimental/pom.xml +++ b/sdk/core/azure-core-amqp-experimental/pom.xml @@ -58,7 +58,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.2.0 diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 25ab4caa1013..9e302e6c43b8 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,24 +1,11 @@ # Release History -## 2.1.0-beta.1 (Unreleased) - +## 2.2.0 (2021-06-10) ### New Features - -- Exposing CbsAuthorizationType. -- Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker. -- AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable. -- Delivery outcomes and delivery states are added. - -### Bug Fixes - -- Fixed a bug where connection and sessions would not be disposed when their endpoint closed. -- Fixed a bug where ReactorExecutor did not dispose of its scheduler when "IO Sink was interrupted". +- Adding support for AMQP data types SEQUENCE and VALUE. ### Dependency Updates - -- Upgraded `azure-core` from `1.16.0` to `1.17.0`. -- Upgraded `proton-j` from `0.33.4` to `0.33.8`. -- Upgraded `qpid-proton-j-extensions` from `1.2.3` to `1.2.4`. +- Upgraded `azure-core` dependency to `1.15.0`. ## 2.0.6 (2021-05-24) ### Bug Fixes @@ -31,13 +18,6 @@ - Upgraded `azure-core` from `1.15.0` to `1.16.0`. - Upgraded Reactor from `3.4.3` to `3.4.5`. -## 2.2.0-beta.1 (2021-04-14) -### New Features -- Adding support for AMQP data types SEQUENCE and VALUE. - -### Dependency Updates -- Upgraded `azure-core` dependency to `1.15.0`. - ## 2.0.4 (2021-04-12) ### Bug Fixes diff --git a/sdk/core/azure-core-amqp/pom.xml b/sdk/core/azure-core-amqp/pom.xml index 5b1572cc945f..0aaac0644b42 100644 --- a/sdk/core/azure-core-amqp/pom.xml +++ b/sdk/core/azure-core-amqp/pom.xml @@ -14,7 +14,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.2.0 jar Microsoft Azure Java Core AMQP Library diff --git a/sdk/core/pom.xml b/sdk/core/pom.xml index 4bc74600871f..daa5c2b145c3 100644 --- a/sdk/core/pom.xml +++ b/sdk/core/pom.xml @@ -37,7 +37,7 @@ com.azure azure-core-amqp - 2.1.0-beta.1 + 2.2.0 com.azure