diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 6eba1b8a641a..25ab4caa1013 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -3,13 +3,22 @@ ## 2.1.0-beta.1 (Unreleased) ### New Features + - Exposing CbsAuthorizationType. - Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker. - AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable. - Delivery outcomes and delivery states are added. ### Bug Fixes + - Fixed a bug where connection and sessions would not be disposed when their endpoint closed. +- Fixed a bug where ReactorExecutor did not dispose of its scheduler when "IO Sink was interrupted". + +### Dependency Updates + +- Upgraded `azure-core` from `1.16.0` to `1.17.0`. +- Upgraded `proton-j` from `0.33.4` to `0.33.8`. +- Upgraded `qpid-proton-j-extensions` from `1.2.3` to `1.2.4`. ## 2.0.6 (2021-05-24) ### Bug Fixes diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 4e9bf0b850a1..53d5f7a24135 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -464,17 +464,12 @@ private synchronized void closeConnectionWork() { } connection.close(); + handler.close(); final ArrayList> closingSessions = new ArrayList<>(); sessionMap.values().forEach(link -> closingSessions.add(link.isClosed())); - final Mono closedExecutor; - if (executor != null) { - closedExecutor = executor.isClosed(); - executor.close(); - } else { - closedExecutor = Mono.empty(); - } + final Mono closedExecutor = executor != null ? executor.closeAsync() : Mono.empty(); // Close all the children. final Mono closeSessionsMono = Mono.when(closingSessions) @@ -491,7 +486,6 @@ private synchronized void closeConnectionWork() { return false; }); - handler.close(); subscriptions.dispose(); })); @@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException { final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler(); - reactorProvider.getReactorDispatcher().getShutdownSignal() - .subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal), - error -> reactorExceptionHandler.onConnectionError(error)); - // Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe. // Using Schedulers.single() will use the same thread for all connections in this process which // limits the scalability of the no. of concurrent connections a single process can have. @@ -539,6 +529,22 @@ private synchronized Connection getOrCreateConnection() throws IOException { reactorExceptionHandler, pendingTasksDuration, connectionOptions.getFullyQualifiedNamespace()); + // To avoid inconsistent synchronization of executor, we set this field with the closeAsync method. + // It will not be kicked off until subscribed to. + final Mono executorCloseMono = executor.closeAsync(); + reactorProvider.getReactorDispatcher().getShutdownSignal() + .flatMap(signal -> { + logger.info("Shutdown signal received from reactor provider."); + reactorExceptionHandler.onConnectionShutdown(signal); + return executorCloseMono; + }) + .onErrorResume(error -> { + logger.info("Error received from reactor provider.", error); + reactorExceptionHandler.onConnectionError(error); + return executorCloseMono; + }) + .subscribe(); + executor.start(); } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java index 834477d827db..7b47dbc5c4e6 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java @@ -6,6 +6,7 @@ import com.azure.core.amqp.AmqpShutdownSignal; import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.util.AsyncCloseable; import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.engine.HandlerException; @@ -14,7 +15,6 @@ import reactor.core.publisher.Sinks; import reactor.core.scheduler.Scheduler; -import java.io.Closeable; import java.nio.channels.UnresolvedAddressException; import java.time.Duration; import java.util.Locale; @@ -23,7 +23,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -class ReactorExecutor implements Closeable { +/** + * Schedules the proton-j reactor to continuously run work. + */ +class ReactorExecutor implements AsyncCloseable { private static final String LOG_MESSAGE = "connectionId[{}], message[{}]"; private final ClientLogger logger = new ClientLogger(ReactorExecutor.class); @@ -51,7 +54,7 @@ class ReactorExecutor implements Closeable { /** * Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link - * #close()} is called. + * #closeAsync()} is called. */ void start() { if (hasStarted.getAndSet(true)) { @@ -142,10 +145,6 @@ private void run() { } } - Mono isClosed() { - return isClosedMono.asMono(); - } - /** * Schedules the release of the current reactor after operation timeout has elapsed. */ @@ -175,26 +174,27 @@ private void scheduleCompletePendingTasks() { }, timeout.toMillis(), TimeUnit.MILLISECONDS); } - @Override - public void close() { - if (isDisposed.getAndSet(true)) { - return; - } - - if (hasStarted.get()) { - scheduleCompletePendingTasks(); - } - } - private void close(String reason) { logger.verbose("Completing close and disposing scheduler. {}", reason); - + scheduler.dispose(); isClosedMono.emitEmpty((signalType, emitResult) -> { logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType, emitResult); return false; }); exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason)); - scheduler.dispose(); + } + + @Override + public Mono closeAsync() { + if (isDisposed.getAndSet(true)) { + return isClosedMono.asMono(); + } + + if (hasStarted.get()) { + scheduleCompletePendingTasks(); + } + + return isClosedMono.asMono(); } }