diff --git a/eng/spotbugs-aggregate-report/pom.xml b/eng/spotbugs-aggregate-report/pom.xml index 3633f542a2ef..a60c9dff8963 100644 --- a/eng/spotbugs-aggregate-report/pom.xml +++ b/eng/spotbugs-aggregate-report/pom.xml @@ -117,17 +117,17 @@ com.microsoft.azure azure-eventhubs - 3.2.0-beta.1 + 3.2.0 com.microsoft.azure azure-eventhubs-eph - 3.2.0-beta.1 + 3.2.0 com.microsoft.azure azure-eventhubs-extensions - 3.2.0-beta.1 + 3.2.0 + 3.2.0 Microsoft Azure SDK for Event Hubs Event Processor Host(EPH) EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer @@ -35,7 +35,7 @@ com.microsoft.azure azure-eventhubs - 3.2.0-beta.1 + 3.2.0 com.microsoft.azure diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml index 9f6c9b7b62a4..579a95d25662 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs-extensions - 3.2.0-beta.1 + 3.2.0 Microsoft Azure SDK for Event Hubs Extensions Extensions built on Microsoft Azure Event Hubs @@ -35,7 +35,7 @@ com.microsoft.azure azure-eventhubs - 3.2.0-beta.1 + 3.2.0 org.apache.logging.log4j diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml index df696d9ac217..816b23c4c8ee 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs - 3.2.0-beta.1 + 3.2.0 Microsoft Azure SDK for Event Hubs Libraries built on Microsoft Azure Event Hubs diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index 8818a1b71193..8477c3e1aaec 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.net.URI; import java.nio.channels.UnresolvedAddressException; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -70,6 +71,27 @@ static EventHubClient createFromConnectionStringSync(final String connectionStri } + /** + * Synchronous version of {@link #createFromConnectionString(String, RetryPolicy, ScheduledExecutorService, ProxyConfiguration, Duration)}. + * + * @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString. + * @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub. + * @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}. + * @param configuration The proxy configuration for this EventHubClient connection; {@code null} or + * {@link ProxyConfiguration#SYSTEM_DEFAULTS} if the system configured proxy settings should be used. + * @param maximumSilentTime Use {@link EventHubClientOptions#SILENT_OFF} except on recommendation from the product group. + * @return EventHubClient which can be used to create Senders and Receivers to EventHub + * @throws EventHubException If Service Bus service encountered problems during connection creation. + * @throws IOException If the underlying Proton-J layer encounter network errors. + */ + static EventHubClient createFromConnectionStringSync(final String connectionString, final RetryPolicy retryPolicy, + final ScheduledExecutorService executor, + final ProxyConfiguration configuration, + final Duration maximumSilentTime) + throws EventHubException, IOException { + return ExceptionUtil.syncWithIOException(() -> createFromConnectionString(connectionString, retryPolicy, executor, configuration, maximumSilentTime).get()); + } + /** * Factory method to create an instance of {@link EventHubClient} using the supplied connectionString. * In a normal scenario (when re-direct is not enabled) - one EventHubClient instance maps to one Connection to the Azure ServiceBus EventHubs service. @@ -126,6 +148,29 @@ static CompletableFuture createFromConnectionString( return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration); } + /** + * Factory method to create an instance of {@link EventHubClient} using the supplied {@code connectionString}. One + * EventHubClient instance maps to one connection to the Event Hubs service. + * + *

+ * The {@link EventHubClient} created from this method creates a Sender instance internally, which is used by + * the {@link #send(EventData)} methods. + *

+ * @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString. + * @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub. + * @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}. + * @param proxyConfiguration The proxy configuration for this EventHubClient connection; {@code null} or + * {@link ProxyConfiguration#SYSTEM_DEFAULTS} if the system configured proxy settings should be used. + * @param maximumSilentTime Use {@link EventHubClientOptions#SILENT_OFF} except on recommendation from the product group. + * @return CompletableFuture{@literal } which can be used to create Senders and Receivers to EventHub. + * @throws IOException If the underlying Proton-J layer encounter network errors. + */ + static CompletableFuture createFromConnectionString( + final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, + final ProxyConfiguration proxyConfiguration, final Duration maximumSilentTime) throws IOException { + return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration, maximumSilentTime); + } + /** * Factory method to create an instance of {@link EventHubClient} using the supplied namespace endpoint address, eventhub name and authentication mechanism. * In a normal scenario (when re-direct is not enabled) - one EventHubClient instance maps to one Connection to the Azure ServiceBus EventHubs service. diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java index 1bade3a979c5..f47e7381bdcd 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java @@ -10,10 +10,14 @@ * All options default to not specified (null) */ public class EventHubClientOptions { + public static final Duration SILENT_OFF = Duration.ofSeconds(0); + public static final Duration SILENT_MINIMUM = Duration.ofSeconds(30); + private Duration operationTimeout = null; private TransportType transportType = null; private RetryPolicy retryPolicy = null; private ProxyConfiguration proxyConfiguration = null; + private Duration maximumSilentTime = SILENT_OFF; /** * Create with all defaults @@ -92,6 +96,30 @@ public EventHubClientOptions setProxyConfiguration(ProxyConfiguration proxyConfi * @return Gets the proxy configuration. */ public ProxyConfiguration getProxyConfiguration() { - return proxyConfiguration; + return this.proxyConfiguration; + } + + /** + * Sets the maximum silent time, in seconds. + * Use only on recommendation from the product group. + * + * @param maximumSilentTime The time, or SILENT_OFF. Time must be at least SILENT_MINIMUM. + * @return The updated options object. + */ + public EventHubClientOptions setMaximumSilentTime(Duration maximumSilentTime) { + if (maximumSilentTime.compareTo(EventHubClientOptions.SILENT_MINIMUM) < 0) { + throw new IllegalArgumentException("Maximum silent time must be at least " + EventHubClientOptions.SILENT_MINIMUM.toMillis() + " milliseconds"); + } + this.maximumSilentTime = maximumSilentTime; + return this; + } + + /** + * Gets the maximum silent time in seconds. + * + * @return The maximum silent time, or SILENT_OFF. + */ + public Duration getMaximumSilentTime() { + return this.maximumSilentTime; } } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java index 373fceecab2f..15235daa1ebd 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java @@ -19,6 +19,7 @@ public final class ClientConstants { public static final Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); public static final Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked"); public static final Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public static final Symbol WATCHDOG_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":watchdog"); public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol(AmqpConstants.PROTON + ":io"); public static final Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); public static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; @@ -38,7 +39,7 @@ public final class ClientConstants { public static final String DEFAULT_RETRY = "Default"; public static final String PRODUCT_NAME = "MSJavaClient"; // {x-version-update-start;com.microsoft.azure:azure-eventhubs;current} - public static final String CURRENT_JAVACLIENT_VERSION = "3.2.0-beta.1"; + public static final String CURRENT_JAVACLIENT_VERSION = "3.2.0"; // {x-version-update-end} public static final String PLATFORM_INFO = getPlatformInfo(); public static final String FRAMEWORK_INFO = getFrameworkInfo(); diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index cf60b5f5b01e..d39fdc66955a 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -64,6 +64,16 @@ public static CompletableFuture create( final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration) throws IOException { + return create(connectionString, retryPolicy, executor, proxyConfiguration, EventHubClientOptions.SILENT_OFF); + } + + public static CompletableFuture create( + final String connectionString, + final RetryPolicy retryPolicy, + final ScheduledExecutorService executor, + final ProxyConfiguration proxyConfiguration, + final Duration watchdogTriggerTime) + throws IOException { if (StringUtil.isNullOrWhiteSpace(connectionString)) { throw new IllegalArgumentException("Connection string cannot be null or empty"); } @@ -72,7 +82,7 @@ public static CompletableFuture create( final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString); final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr.getEventHubName(), executor); - return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration) + return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTriggerTime) .thenApplyAsync(new Function() { @Override public EventHubClient apply(MessagingFactory factory) { @@ -103,7 +113,8 @@ public static CompletableFuture create( builder.setOperationTimeout(options.getOperationTimeout()) .setTransportType(options.getTransportType()) .setRetryPolicy(options.getRetryPolicy()) - .setProxyConfiguration(options.getProxyConfiguration()); + .setProxyConfiguration(options.getProxyConfiguration()) + .setWatchdogTriggerTime(options.getMaximumSilentTime()); } return builder.build() diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java index 5e81d25326c1..9626f541663d 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java @@ -75,6 +75,7 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver, private volatile CompletableFuture closeTimer; private int prefetchCount; private Exception lastKnownLinkError; + private volatile long lastDeliveryReceivedTime; private String linkCreationTime; // Used when looking at Java dumps, do not remove. private MessageReceiver(final MessagingFactory factory, @@ -184,6 +185,11 @@ public void onError(Exception error) { }, ClientConstants.TOKEN_REFRESH_INTERVAL, this.underlyingFactory); + + this.underlyingFactory.registerForWatchdog(this); + // Set last-received time to receiver creation time. This means that a newly-created receiver will get + // the watchdog time to receive the first message before it is declared to be failed. + this.lastDeliveryReceivedTime = Instant.now().getEpochSecond(); } // @param connection Connection on which the MessageReceiver's receive AMQP link need to be created on. @@ -310,6 +316,15 @@ public void onOpenComplete(Exception exception) { this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId()); + // Resetting the watchdog time here means that when the watchdog has forced a connection closed, the + // new connection gets the watchdog time to receive a first message before it is declared to be dead. + // Without this, the watchdog would continue to measure from the last message received and the new + // connection would only get the watchdog scan time, which is normally a lot shorter. Using the longer + // time avoids the situation where the network or the service is slow and the client gets trapped in + // a loop killing connections because it doesn't wait long enough for a response. + TRACE_LOGGER.info("Watchdog reset timer for new connection on receiver " + this.getClientId()); + this.lastDeliveryReceivedTime = Instant.now().getEpochSecond(); + this.nextCreditToFlow = 0; this.sendFlow(this.prefetchCount - this.prefetchedMessages.size()); @@ -380,6 +395,10 @@ private void cancelOpenTimer() { } } + public long getLastReceivedTime() { + return this.lastDeliveryReceivedTime; + } + @Override public void onReceiveComplete(Delivery delivery) { int msgSize = delivery.pending(); @@ -393,6 +412,7 @@ public void onReceiveComplete(Delivery delivery) { delivery.settle(); this.prefetchedMessages.add(message); + this.lastDeliveryReceivedTime = Instant.now().getEpochSecond(); this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId()); this.receiveWork.onEvent(); @@ -796,6 +816,8 @@ public ErrorContext getContext() { protected CompletableFuture onClose() { if (!this.getIsClosed()) { try { + this.underlyingFactory.unregisterForWatchdog(this); + this.activeClientTokenManager.cancel(); scheduleLinkCloseTimeout(TimeoutTracker.create(operationTimeout, this.receiveLink.getName())); diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index a42b7bb9def5..23c27053cbae 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -3,9 +3,9 @@ package com.microsoft.azure.eventhubs.impl; - import com.microsoft.azure.eventhubs.CommunicationException; import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubClientOptions; import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.ITokenProvider; import com.microsoft.azure.eventhubs.ManagedIdentityTokenProvider; @@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -63,6 +64,14 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti private final ITokenProvider tokenProvider; private final ReactorFactory reactorFactory; + private static final long WATCHDOG_SCAN_DIVISOR = 2; + private final LinkedList watchdogReceivers; + private final Object watchdogSyncObject; + private final Duration watchdogTriggerTime; + private ScheduledFuture watchdogFuture; + private final long watchdogScanSeconds; + private boolean watchdogCleanupDone; + private Reactor reactor; private ReactorDispatcher reactorDispatcher; private Connection connection; @@ -82,7 +91,8 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ReactorFactory reactorFactory, - final ProxyConfiguration proxyConfiguration) { + final ProxyConfiguration proxyConfiguration, + final Duration watchdogTriggerTime) { super(StringUtil.getRandomString("MF"), null, executor); if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -107,6 +117,11 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti this.cbsChannelCreateLock = new Object(); this.mgmtChannelCreateLock = new Object(); + this.watchdogTriggerTime = watchdogTriggerTime; + this.watchdogScanSeconds = watchdogTriggerTime.toMillis() / MessagingFactory.WATCHDOG_SCAN_DIVISOR / 1000; + this.watchdogReceivers = new LinkedList(); + this.watchdogSyncObject = new Object(); + this.closeTask = new CompletableFuture<>(); } @@ -119,7 +134,16 @@ public static CompletableFuture createFromConnectionString( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration) throws IOException { - return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration); + return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, EventHubClientOptions.SILENT_OFF); + } + + public static CompletableFuture createFromConnectionString( + final String connectionString, + final RetryPolicy retryPolicy, + final ScheduledExecutorService executor, + final ProxyConfiguration proxyConfiguration, + final Duration watchdogTriggerTime) throws IOException { + return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, watchdogTriggerTime); } public static CompletableFuture createFromConnectionString( @@ -127,7 +151,8 @@ public static CompletableFuture createFromConnectionString( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ReactorFactory reactorFactory, - final ProxyConfiguration proxyConfiguration) throws IOException { + final ProxyConfiguration proxyConfiguration, + final Duration watchdogTriggerTime) throws IOException { final ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString); ITokenProvider tokenProvider = null; if (!StringUtil.isNullOrWhiteSpace(csb.getSharedAccessSignature())) { @@ -145,7 +170,8 @@ public static CompletableFuture createFromConnectionString( .setTransportType(csb.getTransportType()) .setRetryPolicy(retryPolicy) .setReactorFactory(reactorFactory) - .setProxyConfiguration(proxyConfiguration); + .setProxyConfiguration(proxyConfiguration) + .setWatchdogTriggerTime(watchdogTriggerTime); return builder.build(); } @@ -162,6 +188,7 @@ public static class MessagingFactoryBuilder { private RetryPolicy retryPolicy = RetryPolicy.getDefault(); private ReactorFactory reactorFactory = new ReactorFactory(); private ProxyConfiguration proxyConfiguration; + private Duration watchdogTriggerTime = EventHubClientOptions.SILENT_OFF; public MessagingFactoryBuilder(final String hostname, final ITokenProvider tokenProvider, final ScheduledExecutorService executor) { if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -206,6 +233,11 @@ public MessagingFactoryBuilder setProxyConfiguration(ProxyConfiguration proxyCon return this; } + public MessagingFactoryBuilder setWatchdogTriggerTime(Duration watchdogTriggerTime) { + this.watchdogTriggerTime = watchdogTriggerTime; + return this; + } + public CompletableFuture build() throws IOException { final MessagingFactory messagingFactory = new MessagingFactory(this.hostname, this.operationTimeout, @@ -214,7 +246,8 @@ public CompletableFuture build() throws IOException { this.retryPolicy, this.executor, this.reactorFactory, - this.proxyConfiguration); + this.proxyConfiguration, + this.watchdogTriggerTime); return MessagingFactory.factoryStartup(messagingFactory); } } @@ -222,6 +255,8 @@ public CompletableFuture build() throws IOException { private static CompletableFuture factoryStartup(MessagingFactory messagingFactory) throws IOException { messagingFactory.createConnection(); + messagingFactory.startWatchdog(); + final Timer timer = new Timer(messagingFactory); messagingFactory.openTimer = timer.schedule( new Runnable() { @@ -248,6 +283,96 @@ public void run() { return messagingFactory.open; } + public void registerForWatchdog(final MessageReceiver rcvr) { + if (this.watchdogTriggerTime.compareTo(EventHubClientOptions.SILENT_OFF) > 0) { + TRACE_LOGGER.info("Registering for watchdog: " + rcvr.getClientId()); + synchronized (this.watchdogSyncObject) { + this.watchdogReceivers.add(rcvr); + } + } + // else ignore registration if watchdog is off + } + + public void unregisterForWatchdog(final MessageReceiver rcvr) { + if (this.watchdogTriggerTime.compareTo(EventHubClientOptions.SILENT_OFF) > 0) { + TRACE_LOGGER.info("Unregistering for watchdog: " + rcvr.getClientId()); + synchronized (this.watchdogSyncObject) { + this.watchdogReceivers.remove(rcvr); + } + } + } + + private void startWatchdog() { + if (this.watchdogTriggerTime.compareTo(EventHubClientOptions.SILENT_OFF) > 0) { + TRACE_LOGGER.info("Watchdog scheduling first run in " + this.watchdogScanSeconds + " seconds"); + this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); + } else { + TRACE_LOGGER.info("Watchdog is OFF"); + } + } + + private class WatchDog implements Runnable { + @Override + public void run() { + TRACE_LOGGER.debug("Watchdog run"); + if (MessagingFactory.this.getIsClosingOrClosed()) { + return; + } + + LinkedList copiedList = null; + synchronized (MessagingFactory.this.watchdogSyncObject) { + copiedList = new LinkedList(MessagingFactory.this.watchdogReceivers); + } + if (!copiedList.isEmpty()) { + boolean anyReceiverIsAlive = false; + final long longestAgoAllowable = Instant.now().getEpochSecond() + - (MessagingFactory.this.watchdogTriggerTime.toMillis() / 1000); + + for (MessageReceiver rcvr : copiedList) { + TRACE_LOGGER.debug("Watchdog checking receiver " + rcvr.getClientId() + " last: " + + rcvr.getLastReceivedTime() + " allowable: " + longestAgoAllowable); + if (!rcvr.getIsClosingOrClosed() && (rcvr.getLastReceivedTime() >= longestAgoAllowable)) { + anyReceiverIsAlive = true; + // Found one live receiver, no need to check the rest. + break; + } + } + + if (!anyReceiverIsAlive && !MessagingFactory.this.getIsClosingOrClosed()) { + TRACE_LOGGER.warn("Watchdog forcing connection closed"); + ErrorCondition suspect = new ErrorCondition(ClientConstants.WATCHDOG_ERROR, + "receiver watchdog has fired, all receivers silent"); + MessagingFactory.this.watchdogCleanupDone = false; + MessagingFactory.this.connection.setCondition(suspect); + MessagingFactory.this.connection.close(); + // If the remote host is still responding at the TCP level, then the socket will + // close normally and cleanup/recreation will happen automatically. However, if it + // isn't, then we must call onConnectionError here in order to force cleanup and + // recreation. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + if (!MessagingFactory.this.watchdogCleanupDone) { + TRACE_LOGGER.warn("Watchdog forcing cleanup"); + MessagingFactory.this.onConnectionError(suspect); + } else { + TRACE_LOGGER.info("Watchdog cleanup already in progress"); + } + } + } + + synchronized (MessagingFactory.this.watchdogSyncObject) { + if (!MessagingFactory.this.getIsClosingOrClosed() && !MessagingFactory.this.watchdogFuture.isCancelled()) { + TRACE_LOGGER.debug("Watchdog scheduling next run"); + MessagingFactory.this.watchdogFuture = MessagingFactory.this.executor.schedule(this, MessagingFactory.this.watchdogScanSeconds, TimeUnit.SECONDS); + } else { + TRACE_LOGGER.info("Watchdog stopping due to MessagingFactory close"); + } + } + } + } + @Override public String getHostName() { return this.hostName; @@ -362,6 +487,8 @@ public void onOpenComplete(Exception exception) { @Override public void onConnectionError(ErrorCondition error) { + this.watchdogCleanupDone = true; + if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], error[%s]", this.getClientId(), @@ -491,6 +618,12 @@ private void onReactorError(Exception cause) { @Override protected CompletableFuture onClose() { if (!this.getIsClosed()) { + synchronized (this.watchdogSyncObject) { + if (this.watchdogFuture != null) { + this.watchdogFuture.cancel(true); + } + } + final Timer timer = new Timer(this); this.closeTimer = timer.schedule(new Runnable() { @Override diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java index 216aa77e3674..39269d961e3f 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java @@ -7,6 +7,7 @@ import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubClientOptions; import com.microsoft.azure.eventhubs.EventPosition; import com.microsoft.azure.eventhubs.PartitionReceiveHandler; import com.microsoft.azure.eventhubs.PartitionReceiver; @@ -125,10 +126,12 @@ public void verifyThreadReleaseOnMsgFactoryOpenError() throws Exception { try { final CompletableFuture openFuture = MessagingFactory.createFromConnectionString( - connStr.toString(), null, + connStr.toString(), + null, executor, networkOutageSimulator, - null); + null, + EventHubClientOptions.SILENT_OFF); try { openFuture.get(); Assert.fail();