diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 30b6f6dd0d96..02d6eb58e645 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -27,6 +27,7 @@ import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import java.io.IOException; @@ -350,7 +351,11 @@ private synchronized Connection getOrCreateConnection() throws IOException { connection = reactor.connectionToHost(handler.getHostname(), handler.getProtocolPort(), handler); reactorExceptionHandler = new ReactorExceptionHandler(); - executor = new ReactorExecutor(reactor, Schedulers.single(), connectionId, + // Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe. + // Using Schedulers.single() will use the same thread for all connections in this process which + // limits the scalability of the no. of concurrent connections a single process can have. + Scheduler scheduler = Schedulers.newSingle("reactor-executor"); + executor = new ReactorExecutor(reactor, scheduler, connectionId, reactorExceptionHandler, connectionOptions.getRetry().getTryTimeout(), connectionOptions.getFullyQualifiedNamespace()); @@ -421,7 +426,6 @@ void dispose(ErrorCondition errorCondition) { } else { session.dispose(); } - subscription.dispose(); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java index 4aef69a7a03f..f2eb85566f40 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorExecutor.java @@ -175,14 +175,6 @@ private void scheduleCompletePendingTasks() { public void close() { if (!isDisposed.getAndSet(true)) { close(true, "ReactorExecutor.close() was called."); - - try { - if (!disposeSemaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS)) { - logger.info("Unable to acquire dispose reactor semaphore within timeout."); - } - } catch (InterruptedException e) { - logger.warning("Could not acquire semaphore to finish close operation.", e); - } } } @@ -193,8 +185,16 @@ private void close(boolean isUserInitialized, String reason) { if (isUserInitialized) { scheduleCompletePendingTasks(); + // wait for the scheduled pending tasks to complete + try { + if (!disposeSemaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS)) { + logger.info("Unable to acquire dispose reactor semaphore within timeout."); + } + } catch (InterruptedException e) { + logger.warning("Could not acquire semaphore to finish close operation.", e); + } } - exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, isUserInitialized, reason)); + scheduler.dispose(); } }