diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index 6d87554add01..9d7112ced1cf 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -28,7 +28,6 @@ import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection; import com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection; import com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential; -import java.util.Map; import reactor.core.publisher.Mono; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -36,6 +35,7 @@ import java.net.InetSocketAddress; import java.net.Proxy; import java.util.Locale; +import java.util.Map; import java.util.Objects; import java.util.ServiceLoader; @@ -459,7 +459,7 @@ EventHubAsyncClient buildAsyncClient() { } if (scheduler == null) { - scheduler = Schedulers.newElastic("event-hubs"); + scheduler = Schedulers.elastic(); } final MessageSerializer messageSerializer = new EventHubMessageSerializer(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventProcessorClientAggregateEventsSample.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventProcessorClientAggregateEventsSample.java index 30579d62796e..c7d6a6e5e099 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventProcessorClientAggregateEventsSample.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventProcessorClientAggregateEventsSample.java @@ -98,7 +98,7 @@ public static void main(String[] args) throws Exception { */ private static Mono generateEvents(AtomicBoolean isRunning) { final Logger logger = LoggerFactory.getLogger("Producer"); - final Scheduler scheduler = Schedulers.newElastic("produce"); + final Scheduler scheduler = Schedulers.elastic(); final Duration operationTimeout = Duration.ofSeconds(5); final String[] machineIds = new String[]{"2A", "9B", "6C"}; final Random random = new Random(); @@ -129,10 +129,11 @@ private static Mono generateEvents(AtomicBoolean isRunning) { return client.send(batch); }).block(operationTimeout); } - }).doFinally(signal -> { - logger.info("Disposing of producer."); - client.close(); - }).subscribeOn(scheduler); + }).subscribeOn(scheduler) + .doFinally(signal -> { + logger.info("Disposing of producer."); + client.close(); + }); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java index e6f1fc55ec33..de157d90a047 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/IntegrationTestBase.java @@ -21,6 +21,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInfo; import org.mockito.Mockito; +import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; import java.io.Closeable; @@ -51,6 +52,7 @@ public abstract class IntegrationTestBase extends TestBase { private ConnectionStringProperties properties; private String testName; + private final Scheduler scheduler = Schedulers.newParallel("eh-integration"); protected IntegrationTestBase(ClientLogger logger) { this.logger = logger; @@ -73,6 +75,7 @@ public void setupTest(TestInfo testInfo) { @AfterEach public void teardownTest(TestInfo testInfo) { logger.info("[{}]: Performing test clean-up.", testInfo.getDisplayName()); + scheduler.dispose(); afterTest(); // Tear down any inline mocks to avoid memory leaks. @@ -161,7 +164,7 @@ protected EventHubClientBuilder createBuilder(boolean useCredentials) { .proxyOptions(ProxyOptions.SYSTEM_DEFAULTS) .retry(RETRY_OPTIONS) .transportType(AmqpTransportType.AMQP) - .scheduler(Schedulers.newParallel("eh-integration")); + .scheduler(scheduler); if (useCredentials) { final String fqdn = getFullyQualifiedDomainName();