From b711fac081ede42b7d941f6672d20e58c098a575 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Fri, 6 Mar 2020 14:23:02 -0800 Subject: [PATCH 01/18] More client-level watchdog changes --- .../microsoft-azure-eventhubs/pom.xml | 2 +- .../eventhubs/EventHubClientOptions.java | 48 +++++- .../azure/eventhubs/impl/ClientConstants.java | 1 + .../eventhubs/impl/EventHubClientImpl.java | 18 ++- .../azure/eventhubs/impl/MessageReceiver.java | 22 +++ .../eventhubs/impl/MessagingFactory.java | 137 +++++++++++++++++- 6 files changed, 218 insertions(+), 10 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml index 478167350d18..dc1239bf2180 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs - 3.1.0 + 9.9.0 Microsoft Azure SDK for Event Hubs Libraries built on Microsoft Azure Event Hubs diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java index 1bade3a979c5..629b6e4c3a3e 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java @@ -10,10 +10,15 @@ * All options default to not specified (null) */ public class EventHubClientOptions { + public static final int WATCHDOG_OFF = 0; + public static final int WATCHDOG_SCAN_DEFAULT = 10; // seconds + private Duration operationTimeout = null; private TransportType transportType = null; private RetryPolicy retryPolicy = null; private ProxyConfiguration proxyConfiguration = null; + private int watchdogTimeoutSeconds = WATCHDOG_OFF; + private int watchdogScanSeconds = WATCHDOG_SCAN_DEFAULT; /** * Create with all defaults @@ -92,6 +97,47 @@ public EventHubClientOptions setProxyConfiguration(ProxyConfiguration proxyConfi * @return Gets the proxy configuration. */ public ProxyConfiguration getProxyConfiguration() { - return proxyConfiguration; + return this.proxyConfiguration; + } + + /** + * Sets the watchdog timeout in seconds. + * + * @param watchdogTimeoutSeconds The timeout in seconds, or WATCHDOG_OFF. + * @return The updated options object. + */ + public EventHubClientOptions setWatchdogTimeout(int watchdogTimeoutSeconds) { + this.watchdogTimeoutSeconds = watchdogTimeoutSeconds; + return this; + } + + /** + * Gets the watchdog timeout in seconds. + * + * @return The watchdog timeout. + */ + public int getWatchdogTimeout() { + return this.watchdogTimeoutSeconds; + } + + /** + * Sets the watchdog scan period in seconds. Ignored if the watchdog timeout is WATCHDOG_OFF. + * Defaults to WATCHDOG_SCAN_DEFAULT. + * + * @param watchdogScanSeconds The watchdog scan period in seconds + * @return The updated options object. + */ + public EventHubClientOptions setWatchdogScan(int watchdogScanSeconds) { + this.watchdogScanSeconds = watchdogScanSeconds; + return this; + } + + /** + * Gets the watchdog scan period in seconds. + * + * @return The watchdog scan period. + */ + public int getWatchdogScan() { + return this.watchdogScanSeconds; } } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java index ac1fc7482658..a2714405af7e 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java @@ -19,6 +19,7 @@ public final class ClientConstants { public static final Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); public static final Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked"); public static final Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public static final Symbol WATCHDOG_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":watchdog"); public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol(AmqpConstants.PROTON + ":io"); public static final Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); public static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index cf60b5f5b01e..5344c8f1593e 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -64,6 +64,18 @@ public static CompletableFuture create( final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration) throws IOException { + return create(connectionString, retryPolicy, executor, proxyConfiguration, EventHubClientOptions.WATCHDOG_OFF, + EventHubClientOptions.WATCHDOG_SCAN_DEFAULT); + } + + public static CompletableFuture create( + final String connectionString, + final RetryPolicy retryPolicy, + final ScheduledExecutorService executor, + final ProxyConfiguration proxyConfiguration, + final int watchdogTimeoutSeconds, + final int watchdogScanSeconds) + throws IOException { if (StringUtil.isNullOrWhiteSpace(connectionString)) { throw new IllegalArgumentException("Connection string cannot be null or empty"); } @@ -72,7 +84,7 @@ public static CompletableFuture create( final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString); final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr.getEventHubName(), executor); - return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration) + return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTimeoutSeconds, watchdogScanSeconds) .thenApplyAsync(new Function() { @Override public EventHubClient apply(MessagingFactory factory) { @@ -103,7 +115,9 @@ public static CompletableFuture create( builder.setOperationTimeout(options.getOperationTimeout()) .setTransportType(options.getTransportType()) .setRetryPolicy(options.getRetryPolicy()) - .setProxyConfiguration(options.getProxyConfiguration()); + .setProxyConfiguration(options.getProxyConfiguration()) + .setWatchdogTimeout(options.getWatchdogTimeout()) + .setWatchdogScan(options.getWatchdogScan()); } return builder.build() diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java index 5e81d25326c1..9626f541663d 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java @@ -75,6 +75,7 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver, private volatile CompletableFuture closeTimer; private int prefetchCount; private Exception lastKnownLinkError; + private volatile long lastDeliveryReceivedTime; private String linkCreationTime; // Used when looking at Java dumps, do not remove. private MessageReceiver(final MessagingFactory factory, @@ -184,6 +185,11 @@ public void onError(Exception error) { }, ClientConstants.TOKEN_REFRESH_INTERVAL, this.underlyingFactory); + + this.underlyingFactory.registerForWatchdog(this); + // Set last-received time to receiver creation time. This means that a newly-created receiver will get + // the watchdog time to receive the first message before it is declared to be failed. + this.lastDeliveryReceivedTime = Instant.now().getEpochSecond(); } // @param connection Connection on which the MessageReceiver's receive AMQP link need to be created on. @@ -310,6 +316,15 @@ public void onOpenComplete(Exception exception) { this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId()); + // Resetting the watchdog time here means that when the watchdog has forced a connection closed, the + // new connection gets the watchdog time to receive a first message before it is declared to be dead. + // Without this, the watchdog would continue to measure from the last message received and the new + // connection would only get the watchdog scan time, which is normally a lot shorter. Using the longer + // time avoids the situation where the network or the service is slow and the client gets trapped in + // a loop killing connections because it doesn't wait long enough for a response. + TRACE_LOGGER.info("Watchdog reset timer for new connection on receiver " + this.getClientId()); + this.lastDeliveryReceivedTime = Instant.now().getEpochSecond(); + this.nextCreditToFlow = 0; this.sendFlow(this.prefetchCount - this.prefetchedMessages.size()); @@ -380,6 +395,10 @@ private void cancelOpenTimer() { } } + public long getLastReceivedTime() { + return this.lastDeliveryReceivedTime; + } + @Override public void onReceiveComplete(Delivery delivery) { int msgSize = delivery.pending(); @@ -393,6 +412,7 @@ public void onReceiveComplete(Delivery delivery) { delivery.settle(); this.prefetchedMessages.add(message); + this.lastDeliveryReceivedTime = Instant.now().getEpochSecond(); this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId()); this.receiveWork.onEvent(); @@ -796,6 +816,8 @@ public ErrorContext getContext() { protected CompletableFuture onClose() { if (!this.getIsClosed()) { try { + this.underlyingFactory.unregisterForWatchdog(this); + this.activeClientTokenManager.cancel(); scheduleLinkCloseTimeout(TimeoutTracker.create(operationTimeout, this.receiveLink.getName())); diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index a42b7bb9def5..d1fca412840d 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -3,9 +3,9 @@ package com.microsoft.azure.eventhubs.impl; - import com.microsoft.azure.eventhubs.CommunicationException; import com.microsoft.azure.eventhubs.ConnectionStringBuilder; +import com.microsoft.azure.eventhubs.EventHubClientOptions; import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.ITokenProvider; import com.microsoft.azure.eventhubs.ManagedIdentityTokenProvider; @@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -63,6 +64,12 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti private final ITokenProvider tokenProvider; private final ReactorFactory reactorFactory; + private final LinkedList watchdogReceivers; + private final Object watchdogSyncObject; + private final long watchdogTimeoutSeconds; + private ScheduledFuture watchdogFuture; + private final long watchdogScanSeconds; + private Reactor reactor; private ReactorDispatcher reactorDispatcher; private Connection connection; @@ -82,7 +89,9 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ReactorFactory reactorFactory, - final ProxyConfiguration proxyConfiguration) { + final ProxyConfiguration proxyConfiguration, + final int watchdogTimeoutSeconds, + final int watchdogScanSeconds) { super(StringUtil.getRandomString("MF"), null, executor); if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -107,6 +116,11 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti this.cbsChannelCreateLock = new Object(); this.mgmtChannelCreateLock = new Object(); + this.watchdogTimeoutSeconds = watchdogTimeoutSeconds; + this.watchdogScanSeconds = watchdogScanSeconds; + this.watchdogReceivers = new LinkedList(); + this.watchdogSyncObject = new Object(); + this.closeTask = new CompletableFuture<>(); } @@ -119,7 +133,18 @@ public static CompletableFuture createFromConnectionString( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration) throws IOException { - return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration); + return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, EventHubClientOptions.WATCHDOG_OFF, + EventHubClientOptions.WATCHDOG_SCAN_DEFAULT); + } + + public static CompletableFuture createFromConnectionString( + final String connectionString, + final RetryPolicy retryPolicy, + final ScheduledExecutorService executor, + final ProxyConfiguration proxyConfiguration, + final int watchdogTimeoutSeconds, + final int watchdogScanSeconds) throws IOException { + return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, watchdogTimeoutSeconds, watchdogScanSeconds); } public static CompletableFuture createFromConnectionString( @@ -127,7 +152,9 @@ public static CompletableFuture createFromConnectionString( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ReactorFactory reactorFactory, - final ProxyConfiguration proxyConfiguration) throws IOException { + final ProxyConfiguration proxyConfiguration, + final int watchdogTimeoutSeconds, + final int watchdogScanSeconds) throws IOException { final ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString); ITokenProvider tokenProvider = null; if (!StringUtil.isNullOrWhiteSpace(csb.getSharedAccessSignature())) { @@ -145,7 +172,9 @@ public static CompletableFuture createFromConnectionString( .setTransportType(csb.getTransportType()) .setRetryPolicy(retryPolicy) .setReactorFactory(reactorFactory) - .setProxyConfiguration(proxyConfiguration); + .setProxyConfiguration(proxyConfiguration) + .setWatchdogTimeout(watchdogTimeoutSeconds) + .setWatchdogScan(watchdogScanSeconds); return builder.build(); } @@ -162,6 +191,8 @@ public static class MessagingFactoryBuilder { private RetryPolicy retryPolicy = RetryPolicy.getDefault(); private ReactorFactory reactorFactory = new ReactorFactory(); private ProxyConfiguration proxyConfiguration; + private int watchdogTimeoutSeconds = EventHubClientOptions.WATCHDOG_OFF; + private int watchdogScanSeconds = EventHubClientOptions.WATCHDOG_SCAN_DEFAULT; public MessagingFactoryBuilder(final String hostname, final ITokenProvider tokenProvider, final ScheduledExecutorService executor) { if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -206,6 +237,16 @@ public MessagingFactoryBuilder setProxyConfiguration(ProxyConfiguration proxyCon return this; } + public MessagingFactoryBuilder setWatchdogTimeout(int watchdogTimeoutSeconds) { + this.watchdogTimeoutSeconds = watchdogTimeoutSeconds; + return this; + } + + public MessagingFactoryBuilder setWatchdogScan(int watchdogScanSeconds) { + this.watchdogScanSeconds = watchdogScanSeconds; + return this; + } + public CompletableFuture build() throws IOException { final MessagingFactory messagingFactory = new MessagingFactory(this.hostname, this.operationTimeout, @@ -214,7 +255,9 @@ public CompletableFuture build() throws IOException { this.retryPolicy, this.executor, this.reactorFactory, - this.proxyConfiguration); + this.proxyConfiguration, + this.watchdogTimeoutSeconds, + this.watchdogScanSeconds); return MessagingFactory.factoryStartup(messagingFactory); } } @@ -248,6 +291,82 @@ public void run() { return messagingFactory.open; } + public void registerForWatchdog(final MessageReceiver rcvr) { + if (this.watchdogTimeoutSeconds > EventHubClientOptions.WATCHDOG_OFF) { + TRACE_LOGGER.info("Registering for watchdog: " + rcvr.getClientId()); + synchronized (this.watchdogSyncObject) { + this.watchdogReceivers.add(rcvr); + + if (this.watchdogFuture == null) { + TRACE_LOGGER.info("Scheduling first watchdog"); + this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); + } + } + } + // else ignore registration if watchdog is off + } + + public void unregisterForWatchdog(final MessageReceiver rcvr) { + TRACE_LOGGER.info("Unregistering for watchdog: " + rcvr.getClientId()); + synchronized (this.watchdogSyncObject) { + this.watchdogReceivers.remove(rcvr); + + if (this.watchdogReceivers.isEmpty() && (this.watchdogFuture != null)) { + TRACE_LOGGER.info("Shutting down watchdog"); + this.watchdogFuture.cancel(false); + this.watchdogFuture = null; + } + } + } + + private class WatchDog implements Runnable { + @Override + public void run() { + TRACE_LOGGER.info("watchdog run"); + if (MessagingFactory.this.getIsClosingOrClosed()) { + return; + } + if (MessagingFactory.this.watchdogTimeoutSeconds <= EventHubClientOptions.WATCHDOG_OFF) { + // TODO log an error, should never get here + return; + } + + boolean silentReceiverDetected = false; + String silentClientId = "UNKNOWN"; + final long longestAgoAllowable = Instant.now().getEpochSecond() - MessagingFactory.this.watchdogTimeoutSeconds; + synchronized (MessagingFactory.this.watchdogSyncObject) { + for (MessageReceiver rcvr : MessagingFactory.this.watchdogReceivers) { + TRACE_LOGGER.info("Watchdog checking receiver " + rcvr.getClientId() + " last: " + rcvr.getLastReceivedTime() + " allowable: " + longestAgoAllowable); + if (!rcvr.getIsClosingOrClosed() && (rcvr.getLastReceivedTime() <= longestAgoAllowable)) { + silentReceiverDetected = true; + silentClientId = rcvr.getClientId(); + TRACE_LOGGER.info("Watchdog declaring silence on " + silentClientId); + // TODO: is one dead receiver enough to declare the connection bad? + break; + } + } + } + + if (silentReceiverDetected && !MessagingFactory.this.getIsClosingOrClosed()) { + TRACE_LOGGER.info("Watchdog forcing connection closed"); + ErrorCondition suspect = new ErrorCondition(ClientConstants.WATCHDOG_ERROR, "receiver watchdog has fired on " + silentClientId); + MessagingFactory.this.connection.setCondition(suspect); + MessagingFactory.this.connection.close(); + // TODO add delay here + MessagingFactory.this.onConnectionError(suspect); + } + + synchronized (MessagingFactory.this.watchdogSyncObject) { + if (!MessagingFactory.this.getIsClosingOrClosed() && + !MessagingFactory.this.watchdogReceivers.isEmpty() && + !MessagingFactory.this.watchdogFuture.isCancelled()) { + TRACE_LOGGER.info("Watchdog scheduling next run"); + MessagingFactory.this.watchdogFuture = MessagingFactory.this.executor.schedule(this, MessagingFactory.this.watchdogScanSeconds, TimeUnit.SECONDS); + } + } + } + } + @Override public String getHostName() { return this.hostName; @@ -491,6 +610,12 @@ private void onReactorError(Exception cause) { @Override protected CompletableFuture onClose() { if (!this.getIsClosed()) { + synchronized (this.watchdogSyncObject) { + if (this.watchdogFuture != null) { + this.watchdogFuture.cancel(true); + } + } + final Timer timer = new Timer(this); this.closeTimer = timer.schedule(new Runnable() { @Override From 55d9a84fb994054e432a607edad64b8e45c22302 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 14 Apr 2020 18:00:15 -0700 Subject: [PATCH 02/18] Improve TODO comments --- .../microsoft/azure/eventhubs/impl/MessagingFactory.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index d1fca412840d..7f7ec44d20f1 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -338,10 +338,11 @@ public void run() { for (MessageReceiver rcvr : MessagingFactory.this.watchdogReceivers) { TRACE_LOGGER.info("Watchdog checking receiver " + rcvr.getClientId() + " last: " + rcvr.getLastReceivedTime() + " allowable: " + longestAgoAllowable); if (!rcvr.getIsClosingOrClosed() && (rcvr.getLastReceivedTime() <= longestAgoAllowable)) { + // TODO: is one dead receiver enough to declare the connection bad? + // Or should we wait for all receivers to be silent? silentReceiverDetected = true; silentClientId = rcvr.getClientId(); TRACE_LOGGER.info("Watchdog declaring silence on " + silentClientId); - // TODO: is one dead receiver enough to declare the connection bad? break; } } @@ -352,7 +353,11 @@ public void run() { ErrorCondition suspect = new ErrorCondition(ClientConstants.WATCHDOG_ERROR, "receiver watchdog has fired on " + silentClientId); MessagingFactory.this.connection.setCondition(suspect); MessagingFactory.this.connection.close(); - // TODO add delay here + // TODO TODO TODO + // If the remote host is still responding at the TCP level, then the socket will close normally + // and cleanup will happen automatically. However, if it isn't, then we must call onConnectionError + // here in order to force cleanup. Before this watchdog is released, must add timeout mechanism here + // that waits for some period and then forces cleanup if it has not happened already. MessagingFactory.this.onConnectionError(suspect); } From 9dfd9ce4fe4362e10596d1e9a06b27fb2de7366b Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Thu, 16 Apr 2020 16:25:34 -0700 Subject: [PATCH 03/18] Most changes from review --- .../eventhubs/EventHubClientOptions.java | 45 +++------ .../eventhubs/impl/EventHubClientImpl.java | 11 +-- .../eventhubs/impl/MessagingFactory.java | 95 ++++++++----------- 3 files changed, 54 insertions(+), 97 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java index 629b6e4c3a3e..5247fed277b0 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java @@ -11,14 +11,13 @@ */ public class EventHubClientOptions { public static final int WATCHDOG_OFF = 0; - public static final int WATCHDOG_SCAN_DEFAULT = 10; // seconds + public static final int WATCHDOG_MINIMUM_SECONDS = 30; private Duration operationTimeout = null; private TransportType transportType = null; private RetryPolicy retryPolicy = null; private ProxyConfiguration proxyConfiguration = null; - private int watchdogTimeoutSeconds = WATCHDOG_OFF; - private int watchdogScanSeconds = WATCHDOG_SCAN_DEFAULT; + private int watchdogTriggerSeconds = WATCHDOG_OFF; /** * Create with all defaults @@ -101,43 +100,25 @@ public ProxyConfiguration getProxyConfiguration() { } /** - * Sets the watchdog timeout in seconds. + * Sets the watchdog trigger time in seconds. * - * @param watchdogTimeoutSeconds The timeout in seconds, or WATCHDOG_OFF. + * @param watchdogTriggerSeconds The time in seconds, or WATCHDOG_OFF. Time must be at least WATCHDOG_MINIMUM_SECONDS. * @return The updated options object. */ - public EventHubClientOptions setWatchdogTimeout(int watchdogTimeoutSeconds) { - this.watchdogTimeoutSeconds = watchdogTimeoutSeconds; + public EventHubClientOptions setWatchdogTriggerTime(int watchdogTriggerSeconds) { + if (watchdogTriggerSeconds < EventHubClientOptions.WATCHDOG_MINIMUM_SECONDS) { + throw new IllegalArgumentException("Watchdog trigger time must be at least " + EventHubClientOptions.WATCHDOG_MINIMUM_SECONDS + " seconds"); + } + this.watchdogTriggerSeconds = watchdogTriggerSeconds; return this; } /** - * Gets the watchdog timeout in seconds. + * Gets the watchdog trigger time in seconds. * - * @return The watchdog timeout. + * @return The watchdog trigger time, or WATCHDOG_OFF. */ - public int getWatchdogTimeout() { - return this.watchdogTimeoutSeconds; - } - - /** - * Sets the watchdog scan period in seconds. Ignored if the watchdog timeout is WATCHDOG_OFF. - * Defaults to WATCHDOG_SCAN_DEFAULT. - * - * @param watchdogScanSeconds The watchdog scan period in seconds - * @return The updated options object. - */ - public EventHubClientOptions setWatchdogScan(int watchdogScanSeconds) { - this.watchdogScanSeconds = watchdogScanSeconds; - return this; - } - - /** - * Gets the watchdog scan period in seconds. - * - * @return The watchdog scan period. - */ - public int getWatchdogScan() { - return this.watchdogScanSeconds; + public int getWatchdogTriggerTime() { + return this.watchdogTriggerSeconds; } } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 5344c8f1593e..10087a0492a6 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -64,8 +64,7 @@ public static CompletableFuture create( final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration) throws IOException { - return create(connectionString, retryPolicy, executor, proxyConfiguration, EventHubClientOptions.WATCHDOG_OFF, - EventHubClientOptions.WATCHDOG_SCAN_DEFAULT); + return create(connectionString, retryPolicy, executor, proxyConfiguration, EventHubClientOptions.WATCHDOG_OFF); } public static CompletableFuture create( @@ -73,8 +72,7 @@ public static CompletableFuture create( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration, - final int watchdogTimeoutSeconds, - final int watchdogScanSeconds) + final int watchdogTriggerSeconds) throws IOException { if (StringUtil.isNullOrWhiteSpace(connectionString)) { throw new IllegalArgumentException("Connection string cannot be null or empty"); @@ -84,7 +82,7 @@ public static CompletableFuture create( final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString); final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr.getEventHubName(), executor); - return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTimeoutSeconds, watchdogScanSeconds) + return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTriggerSeconds) .thenApplyAsync(new Function() { @Override public EventHubClient apply(MessagingFactory factory) { @@ -116,8 +114,7 @@ public static CompletableFuture create( .setTransportType(options.getTransportType()) .setRetryPolicy(options.getRetryPolicy()) .setProxyConfiguration(options.getProxyConfiguration()) - .setWatchdogTimeout(options.getWatchdogTimeout()) - .setWatchdogScan(options.getWatchdogScan()); + .setWatchdogTriggerTime(options.getWatchdogTriggerTime()); } return builder.build() diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index 7f7ec44d20f1..8a0886b8a105 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -64,9 +64,10 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti private final ITokenProvider tokenProvider; private final ReactorFactory reactorFactory; + private static final long WATCHDOG_SCAN_DIVISOR = 2; private final LinkedList watchdogReceivers; private final Object watchdogSyncObject; - private final long watchdogTimeoutSeconds; + private final long watchdogTriggerSeconds; private ScheduledFuture watchdogFuture; private final long watchdogScanSeconds; @@ -90,8 +91,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti final ScheduledExecutorService executor, final ReactorFactory reactorFactory, final ProxyConfiguration proxyConfiguration, - final int watchdogTimeoutSeconds, - final int watchdogScanSeconds) { + final int watchdogTimeoutSeconds) { super(StringUtil.getRandomString("MF"), null, executor); if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -116,10 +116,14 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti this.cbsChannelCreateLock = new Object(); this.mgmtChannelCreateLock = new Object(); - this.watchdogTimeoutSeconds = watchdogTimeoutSeconds; - this.watchdogScanSeconds = watchdogScanSeconds; + this.watchdogTriggerSeconds = watchdogTimeoutSeconds; + this.watchdogScanSeconds = watchdogTriggerSeconds / MessagingFactory.WATCHDOG_SCAN_DIVISOR; this.watchdogReceivers = new LinkedList(); this.watchdogSyncObject = new Object(); + if (this.watchdogTriggerSeconds > EventHubClientOptions.WATCHDOG_OFF) { + TRACE_LOGGER.info("Watchdog scheduling first run"); + this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); + } this.closeTask = new CompletableFuture<>(); } @@ -133,8 +137,7 @@ public static CompletableFuture createFromConnectionString( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration) throws IOException { - return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, EventHubClientOptions.WATCHDOG_OFF, - EventHubClientOptions.WATCHDOG_SCAN_DEFAULT); + return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, EventHubClientOptions.WATCHDOG_OFF); } public static CompletableFuture createFromConnectionString( @@ -142,9 +145,8 @@ public static CompletableFuture createFromConnectionString( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration, - final int watchdogTimeoutSeconds, - final int watchdogScanSeconds) throws IOException { - return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, watchdogTimeoutSeconds, watchdogScanSeconds); + final int watchdogTriggerSeconds) throws IOException { + return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, watchdogTriggerSeconds); } public static CompletableFuture createFromConnectionString( @@ -153,8 +155,7 @@ public static CompletableFuture createFromConnectionString( final ScheduledExecutorService executor, final ReactorFactory reactorFactory, final ProxyConfiguration proxyConfiguration, - final int watchdogTimeoutSeconds, - final int watchdogScanSeconds) throws IOException { + final int watchdogTriggerSeconds) throws IOException { final ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString); ITokenProvider tokenProvider = null; if (!StringUtil.isNullOrWhiteSpace(csb.getSharedAccessSignature())) { @@ -173,8 +174,7 @@ public static CompletableFuture createFromConnectionString( .setRetryPolicy(retryPolicy) .setReactorFactory(reactorFactory) .setProxyConfiguration(proxyConfiguration) - .setWatchdogTimeout(watchdogTimeoutSeconds) - .setWatchdogScan(watchdogScanSeconds); + .setWatchdogTriggerTime(watchdogTriggerSeconds); return builder.build(); } @@ -191,8 +191,7 @@ public static class MessagingFactoryBuilder { private RetryPolicy retryPolicy = RetryPolicy.getDefault(); private ReactorFactory reactorFactory = new ReactorFactory(); private ProxyConfiguration proxyConfiguration; - private int watchdogTimeoutSeconds = EventHubClientOptions.WATCHDOG_OFF; - private int watchdogScanSeconds = EventHubClientOptions.WATCHDOG_SCAN_DEFAULT; + private int watchdogTriggerSeconds = EventHubClientOptions.WATCHDOG_OFF; public MessagingFactoryBuilder(final String hostname, final ITokenProvider tokenProvider, final ScheduledExecutorService executor) { if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -237,13 +236,8 @@ public MessagingFactoryBuilder setProxyConfiguration(ProxyConfiguration proxyCon return this; } - public MessagingFactoryBuilder setWatchdogTimeout(int watchdogTimeoutSeconds) { - this.watchdogTimeoutSeconds = watchdogTimeoutSeconds; - return this; - } - - public MessagingFactoryBuilder setWatchdogScan(int watchdogScanSeconds) { - this.watchdogScanSeconds = watchdogScanSeconds; + public MessagingFactoryBuilder setWatchdogTriggerTime(int watchdogTriggerSeconds) { + this.watchdogTriggerSeconds = watchdogTriggerSeconds; return this; } @@ -256,8 +250,7 @@ public CompletableFuture build() throws IOException { this.executor, this.reactorFactory, this.proxyConfiguration, - this.watchdogTimeoutSeconds, - this.watchdogScanSeconds); + this.watchdogTriggerSeconds); return MessagingFactory.factoryStartup(messagingFactory); } } @@ -292,29 +285,20 @@ public void run() { } public void registerForWatchdog(final MessageReceiver rcvr) { - if (this.watchdogTimeoutSeconds > EventHubClientOptions.WATCHDOG_OFF) { + if (this.watchdogTriggerSeconds > EventHubClientOptions.WATCHDOG_OFF) { TRACE_LOGGER.info("Registering for watchdog: " + rcvr.getClientId()); synchronized (this.watchdogSyncObject) { this.watchdogReceivers.add(rcvr); - - if (this.watchdogFuture == null) { - TRACE_LOGGER.info("Scheduling first watchdog"); - this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); - } } } // else ignore registration if watchdog is off } public void unregisterForWatchdog(final MessageReceiver rcvr) { - TRACE_LOGGER.info("Unregistering for watchdog: " + rcvr.getClientId()); - synchronized (this.watchdogSyncObject) { - this.watchdogReceivers.remove(rcvr); - - if (this.watchdogReceivers.isEmpty() && (this.watchdogFuture != null)) { - TRACE_LOGGER.info("Shutting down watchdog"); - this.watchdogFuture.cancel(false); - this.watchdogFuture = null; + if (this.watchdogTriggerSeconds > EventHubClientOptions.WATCHDOG_OFF) { + TRACE_LOGGER.info("Unregistering for watchdog: " + rcvr.getClientId()); + synchronized (this.watchdogSyncObject) { + this.watchdogReceivers.remove(rcvr); } } } @@ -326,31 +310,28 @@ public void run() { if (MessagingFactory.this.getIsClosingOrClosed()) { return; } - if (MessagingFactory.this.watchdogTimeoutSeconds <= EventHubClientOptions.WATCHDOG_OFF) { + if (MessagingFactory.this.watchdogTriggerSeconds <= EventHubClientOptions.WATCHDOG_OFF) { // TODO log an error, should never get here return; } - boolean silentReceiverDetected = false; - String silentClientId = "UNKNOWN"; - final long longestAgoAllowable = Instant.now().getEpochSecond() - MessagingFactory.this.watchdogTimeoutSeconds; + boolean anyReceiverIsAlive = false; + final long longestAgoAllowable = Instant.now().getEpochSecond() - MessagingFactory.this.watchdogTriggerSeconds; + LinkedList copiedList = null; synchronized (MessagingFactory.this.watchdogSyncObject) { - for (MessageReceiver rcvr : MessagingFactory.this.watchdogReceivers) { - TRACE_LOGGER.info("Watchdog checking receiver " + rcvr.getClientId() + " last: " + rcvr.getLastReceivedTime() + " allowable: " + longestAgoAllowable); - if (!rcvr.getIsClosingOrClosed() && (rcvr.getLastReceivedTime() <= longestAgoAllowable)) { - // TODO: is one dead receiver enough to declare the connection bad? - // Or should we wait for all receivers to be silent? - silentReceiverDetected = true; - silentClientId = rcvr.getClientId(); - TRACE_LOGGER.info("Watchdog declaring silence on " + silentClientId); - break; - } + copiedList = new LinkedList(MessagingFactory.this.watchdogReceivers); + } + for (MessageReceiver rcvr : copiedList) { + TRACE_LOGGER.info("Watchdog checking receiver " + rcvr.getClientId() + " last: " + rcvr.getLastReceivedTime() + " allowable: " + longestAgoAllowable); + if (!rcvr.getIsClosingOrClosed() && (rcvr.getLastReceivedTime() >= longestAgoAllowable)) { + anyReceiverIsAlive = true; + break; } } - if (silentReceiverDetected && !MessagingFactory.this.getIsClosingOrClosed()) { + if (!anyReceiverIsAlive && !MessagingFactory.this.getIsClosingOrClosed()) { TRACE_LOGGER.info("Watchdog forcing connection closed"); - ErrorCondition suspect = new ErrorCondition(ClientConstants.WATCHDOG_ERROR, "receiver watchdog has fired on " + silentClientId); + ErrorCondition suspect = new ErrorCondition(ClientConstants.WATCHDOG_ERROR, "receiver watchdog has fired, all receivers silent"); MessagingFactory.this.connection.setCondition(suspect); MessagingFactory.this.connection.close(); // TODO TODO TODO @@ -362,9 +343,7 @@ public void run() { } synchronized (MessagingFactory.this.watchdogSyncObject) { - if (!MessagingFactory.this.getIsClosingOrClosed() && - !MessagingFactory.this.watchdogReceivers.isEmpty() && - !MessagingFactory.this.watchdogFuture.isCancelled()) { + if (!MessagingFactory.this.getIsClosingOrClosed() && !MessagingFactory.this.watchdogFuture.isCancelled()) { TRACE_LOGGER.info("Watchdog scheduling next run"); MessagingFactory.this.watchdogFuture = MessagingFactory.this.executor.schedule(this, MessagingFactory.this.watchdogScanSeconds, TimeUnit.SECONDS); } From d1a4530b9b41aa4c24ae28f5d534e08a2752264b Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Fri, 17 Apr 2020 16:11:28 -0700 Subject: [PATCH 04/18] Add forced cleanup when needed --- .../eventhubs/impl/MessagingFactory.java | 72 +++++++++++++------ 1 file changed, 49 insertions(+), 23 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index 8a0886b8a105..a6bd628800b2 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -70,6 +70,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti private final long watchdogTriggerSeconds; private ScheduledFuture watchdogFuture; private final long watchdogScanSeconds; + private boolean watchdogCleanupDone; private Reactor reactor; private ReactorDispatcher reactorDispatcher; @@ -120,10 +121,6 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti this.watchdogScanSeconds = watchdogTriggerSeconds / MessagingFactory.WATCHDOG_SCAN_DIVISOR; this.watchdogReceivers = new LinkedList(); this.watchdogSyncObject = new Object(); - if (this.watchdogTriggerSeconds > EventHubClientOptions.WATCHDOG_OFF) { - TRACE_LOGGER.info("Watchdog scheduling first run"); - this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); - } this.closeTask = new CompletableFuture<>(); } @@ -258,6 +255,8 @@ public CompletableFuture build() throws IOException { private static CompletableFuture factoryStartup(MessagingFactory messagingFactory) throws IOException { messagingFactory.createConnection(); + messagingFactory.startWatchdog(); + final Timer timer = new Timer(messagingFactory); messagingFactory.openTimer = timer.schedule( new Runnable() { @@ -303,6 +302,15 @@ public void unregisterForWatchdog(final MessageReceiver rcvr) { } } + private void startWatchdog() { + if (this.watchdogTriggerSeconds > EventHubClientOptions.WATCHDOG_OFF) { + TRACE_LOGGER.info("Watchdog scheduling first run"); + this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); + } else { + TRACE_LOGGER.info("Watchdog is OFF"); + } + } + private class WatchDog implements Runnable { @Override public void run() { @@ -315,31 +323,47 @@ public void run() { return; } - boolean anyReceiverIsAlive = false; - final long longestAgoAllowable = Instant.now().getEpochSecond() - MessagingFactory.this.watchdogTriggerSeconds; LinkedList copiedList = null; synchronized (MessagingFactory.this.watchdogSyncObject) { copiedList = new LinkedList(MessagingFactory.this.watchdogReceivers); } - for (MessageReceiver rcvr : copiedList) { - TRACE_LOGGER.info("Watchdog checking receiver " + rcvr.getClientId() + " last: " + rcvr.getLastReceivedTime() + " allowable: " + longestAgoAllowable); - if (!rcvr.getIsClosingOrClosed() && (rcvr.getLastReceivedTime() >= longestAgoAllowable)) { - anyReceiverIsAlive = true; - break; + if (!copiedList.isEmpty()) { + boolean anyReceiverIsAlive = false; + final long longestAgoAllowable = Instant.now().getEpochSecond() + - MessagingFactory.this.watchdogTriggerSeconds; + + for (MessageReceiver rcvr : copiedList) { + TRACE_LOGGER.info("Watchdog checking receiver " + rcvr.getClientId() + " last: " + + rcvr.getLastReceivedTime() + " allowable: " + longestAgoAllowable); + if (!rcvr.getIsClosingOrClosed() && (rcvr.getLastReceivedTime() >= longestAgoAllowable)) { + anyReceiverIsAlive = true; + // Found one live receiver, no need to check the rest. + break; + } } - } - if (!anyReceiverIsAlive && !MessagingFactory.this.getIsClosingOrClosed()) { - TRACE_LOGGER.info("Watchdog forcing connection closed"); - ErrorCondition suspect = new ErrorCondition(ClientConstants.WATCHDOG_ERROR, "receiver watchdog has fired, all receivers silent"); - MessagingFactory.this.connection.setCondition(suspect); - MessagingFactory.this.connection.close(); - // TODO TODO TODO - // If the remote host is still responding at the TCP level, then the socket will close normally - // and cleanup will happen automatically. However, if it isn't, then we must call onConnectionError - // here in order to force cleanup. Before this watchdog is released, must add timeout mechanism here - // that waits for some period and then forces cleanup if it has not happened already. - MessagingFactory.this.onConnectionError(suspect); + if (!anyReceiverIsAlive && !MessagingFactory.this.getIsClosingOrClosed()) { + TRACE_LOGGER.info("Watchdog forcing connection closed"); + ErrorCondition suspect = new ErrorCondition(ClientConstants.WATCHDOG_ERROR, + "receiver watchdog has fired, all receivers silent"); + MessagingFactory.this.watchdogCleanupDone = false; + MessagingFactory.this.connection.setCondition(suspect); + MessagingFactory.this.connection.close(); + // If the remote host is still responding at the TCP level, then the socket will + // close normally and cleanup/recreation will happen automatically. However, if it + // isn't, then we must call onConnectionError here in order to force cleanup and + // recreation. + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + } + if (!MessagingFactory.this.watchdogCleanupDone) { + TRACE_LOGGER.info("Watchdog forcing cleanup"); + MessagingFactory.this.onConnectionError(suspect); + } else { + TRACE_LOGGER.info("Watchdog cleanup already in progress"); + } + } } synchronized (MessagingFactory.this.watchdogSyncObject) { @@ -465,6 +489,8 @@ public void onOpenComplete(Exception exception) { @Override public void onConnectionError(ErrorCondition error) { + this.watchdogCleanupDone = true; + if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], error[%s]", this.getClientId(), From fb51a49558fd3cf50b02b5e0f9df327bbe4e51e0 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Fri, 17 Apr 2020 16:15:12 -0700 Subject: [PATCH 05/18] Fix version number --- sdk/eventhubs/microsoft-azure-eventhubs/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml index 5032feb87aaa..0042217dbded 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs - 9.9.0 + 3.2.0 Microsoft Azure SDK for Event Hubs Libraries built on Microsoft Azure Event Hubs From e7f24af5dc32989a7f48983e186fa2cc6e0da270 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Mon, 20 Apr 2020 17:49:47 -0700 Subject: [PATCH 06/18] Tracing improve and test fix --- .../azure/eventhubs/impl/MessagingFactory.java | 14 ++++++++------ .../MsgFactoryOpenCloseTest.java | 7 +++++-- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index a6bd628800b2..4157b2b87121 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -314,12 +314,12 @@ private void startWatchdog() { private class WatchDog implements Runnable { @Override public void run() { - TRACE_LOGGER.info("watchdog run"); + TRACE_LOGGER.debug("Watchdog run"); if (MessagingFactory.this.getIsClosingOrClosed()) { return; } if (MessagingFactory.this.watchdogTriggerSeconds <= EventHubClientOptions.WATCHDOG_OFF) { - // TODO log an error, should never get here + TRACE_LOGGER.warn("Watchdog should not run when trigger time is " + MessagingFactory.this.watchdogTriggerSeconds + " -- stopping"); return; } @@ -333,7 +333,7 @@ public void run() { - MessagingFactory.this.watchdogTriggerSeconds; for (MessageReceiver rcvr : copiedList) { - TRACE_LOGGER.info("Watchdog checking receiver " + rcvr.getClientId() + " last: " + TRACE_LOGGER.debug("Watchdog checking receiver " + rcvr.getClientId() + " last: " + rcvr.getLastReceivedTime() + " allowable: " + longestAgoAllowable); if (!rcvr.getIsClosingOrClosed() && (rcvr.getLastReceivedTime() >= longestAgoAllowable)) { anyReceiverIsAlive = true; @@ -343,7 +343,7 @@ public void run() { } if (!anyReceiverIsAlive && !MessagingFactory.this.getIsClosingOrClosed()) { - TRACE_LOGGER.info("Watchdog forcing connection closed"); + TRACE_LOGGER.warn("Watchdog forcing connection closed"); ErrorCondition suspect = new ErrorCondition(ClientConstants.WATCHDOG_ERROR, "receiver watchdog has fired, all receivers silent"); MessagingFactory.this.watchdogCleanupDone = false; @@ -358,7 +358,7 @@ public void run() { } catch (InterruptedException e) { } if (!MessagingFactory.this.watchdogCleanupDone) { - TRACE_LOGGER.info("Watchdog forcing cleanup"); + TRACE_LOGGER.warn("Watchdog forcing cleanup"); MessagingFactory.this.onConnectionError(suspect); } else { TRACE_LOGGER.info("Watchdog cleanup already in progress"); @@ -368,8 +368,10 @@ public void run() { synchronized (MessagingFactory.this.watchdogSyncObject) { if (!MessagingFactory.this.getIsClosingOrClosed() && !MessagingFactory.this.watchdogFuture.isCancelled()) { - TRACE_LOGGER.info("Watchdog scheduling next run"); + TRACE_LOGGER.debug("Watchdog scheduling next run"); MessagingFactory.this.watchdogFuture = MessagingFactory.this.executor.schedule(this, MessagingFactory.this.watchdogScanSeconds, TimeUnit.SECONDS); + } else { + TRACE_LOGGER.info("Watchdog stopping due to MessagingFactory close"); } } } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java index 216aa77e3674..aa5de1f6d067 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java @@ -7,6 +7,7 @@ import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.EventHubClientOptions; import com.microsoft.azure.eventhubs.EventPosition; import com.microsoft.azure.eventhubs.PartitionReceiveHandler; import com.microsoft.azure.eventhubs.PartitionReceiver; @@ -125,10 +126,12 @@ public void verifyThreadReleaseOnMsgFactoryOpenError() throws Exception { try { final CompletableFuture openFuture = MessagingFactory.createFromConnectionString( - connStr.toString(), null, + connStr.toString(), + null, executor, networkOutageSimulator, - null); + null, + EventHubClientOptions.WATCHDOG_OFF); try { openFuture.get(); Assert.fail(); From e63cce03100ffbb71c0af135276952189b1fc028 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 21 Apr 2020 17:48:26 -0700 Subject: [PATCH 07/18] Put version number back --- sdk/eventhubs/microsoft-azure-eventhubs/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml index 0042217dbded..5722c008b1c2 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs - 3.2.0 + 3.2.0-beta.1 Microsoft Azure SDK for Event Hubs Libraries built on Microsoft Azure Event Hubs From e0d90f2664f95f3c20c23f42d00cf3b2f85efc9c Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 21 Apr 2020 18:06:02 -0700 Subject: [PATCH 08/18] Temporary fix until track2 build fixed --- sdk/eventhubs/pom.track1only.xml | 15 +++++++++++++++ 1 file changed, 15 insertions(+) create mode 100644 sdk/eventhubs/pom.track1only.xml diff --git a/sdk/eventhubs/pom.track1only.xml b/sdk/eventhubs/pom.track1only.xml new file mode 100644 index 000000000000..f2e6201995e9 --- /dev/null +++ b/sdk/eventhubs/pom.track1only.xml @@ -0,0 +1,15 @@ + + + 4.0.0 + com.azure + azure-eventhubs-service + pom + 1.0.0 + + microsoft-azure-eventhubs + microsoft-azure-eventhubs-eph + microsoft-azure-eventhubs-extensions + + From 8414932917caa4e849baf38276ffea8d0ab98150 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Wed, 22 Apr 2020 14:41:41 -0700 Subject: [PATCH 09/18] Make watchdog work with connection string --- .../azure/eventhubs/EventHubClient.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index 8818a1b71193..712485456443 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -126,6 +126,30 @@ static CompletableFuture createFromConnectionString( return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration); } + /** + * Factory method to create an instance of {@link EventHubClient} using the supplied {@code connectionString}. One + * EventHubClient instance maps to one connection to the Event Hubs service. + * + *

+ * The {@link EventHubClient} created from this method creates a Sender instance internally, which is used by + * the {@link #send(EventData)} methods. + *

+ * @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString. + * @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub. + * @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}. + * @param proxyConfiguration The proxy configuration for this EventHubClient connection; {@code null} or + * {@link ProxyConfiguration#SYSTEM_DEFAULTS} if the system configured proxy settings should be used. + * @param watchdogTriggerSeconds If no receiver on this client gets a message for this many seconds, then the connection + * will be recreated. Use with caution! {@link EventHubClientOptions#WATCHDOG_OFF} to disable watchdog (the default). + * @return CompletableFuture{@literal } which can be used to create Senders and Receivers to EventHub. + * @throws IOException If the underlying Proton-J layer encounter network errors. + */ + static CompletableFuture createFromConnectionString( + final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, + final ProxyConfiguration proxyConfiguration, final int watchdogTriggerSeconds) throws IOException { + return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTriggerSeconds); + } + /** * Factory method to create an instance of {@link EventHubClient} using the supplied namespace endpoint address, eventhub name and authentication mechanism. * In a normal scenario (when re-direct is not enabled) - one EventHubClient instance maps to one Connection to the Azure ServiceBus EventHubs service. From b594b642e3cf17d7d995e8c11aa66109faa4616f Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 12 May 2020 16:08:57 -0700 Subject: [PATCH 10/18] Change watchdog in public API to silent time. --- .../azure/eventhubs/EventHubClient.java | 28 ++++++++++++++++--- .../eventhubs/EventHubClientOptions.java | 27 +++++++++--------- .../eventhubs/impl/EventHubClientImpl.java | 2 +- 3 files changed, 39 insertions(+), 18 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index 712485456443..7658020050ac 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -70,6 +70,27 @@ static EventHubClient createFromConnectionStringSync(final String connectionStri } + /** + * Synchronous version of {@link #createFromConnectionString(String, RetryPolicy, ScheduledExecutorService, ProxyConfiguration, int)}. + * + * @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString. + * @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub. + * @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}. + * @param configuration The proxy configuration for this EventHubClient connection; {@code null} or + * {@link ProxyConfiguration#SYSTEM_DEFAULTS} if the system configured proxy settings should be used. + * @param maximumSilentTime Use {@link EventHubClientOptions#SILENT_OFF} except on recommendation from the product group. + * @return EventHubClient which can be used to create Senders and Receivers to EventHub + * @throws EventHubException If Service Bus service encountered problems during connection creation. + * @throws IOException If the underlying Proton-J layer encounter network errors. + */ + static EventHubClient createFromConnectionStringSync(final String connectionString, final RetryPolicy retryPolicy, + final ScheduledExecutorService executor, + final ProxyConfiguration configuration, + final int maximumSilentTime) + throws EventHubException, IOException { + return ExceptionUtil.syncWithIOException(() -> createFromConnectionString(connectionString, retryPolicy, executor, configuration, maximumSilentTime).get()); + } + /** * Factory method to create an instance of {@link EventHubClient} using the supplied connectionString. * In a normal scenario (when re-direct is not enabled) - one EventHubClient instance maps to one Connection to the Azure ServiceBus EventHubs service. @@ -139,15 +160,14 @@ static CompletableFuture createFromConnectionString( * @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}. * @param proxyConfiguration The proxy configuration for this EventHubClient connection; {@code null} or * {@link ProxyConfiguration#SYSTEM_DEFAULTS} if the system configured proxy settings should be used. - * @param watchdogTriggerSeconds If no receiver on this client gets a message for this many seconds, then the connection - * will be recreated. Use with caution! {@link EventHubClientOptions#WATCHDOG_OFF} to disable watchdog (the default). + * @param maximumSilentTime Use {@link EventHubClientOptions#SILENT_OFF} except on recommendation from the product group. * @return CompletableFuture{@literal } which can be used to create Senders and Receivers to EventHub. * @throws IOException If the underlying Proton-J layer encounter network errors. */ static CompletableFuture createFromConnectionString( final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, - final ProxyConfiguration proxyConfiguration, final int watchdogTriggerSeconds) throws IOException { - return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTriggerSeconds); + final ProxyConfiguration proxyConfiguration, final int maximumSilentTime) throws IOException { + return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration, maximumSilentTime); } /** diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java index 5247fed277b0..abad5c0ca312 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java @@ -10,14 +10,14 @@ * All options default to not specified (null) */ public class EventHubClientOptions { - public static final int WATCHDOG_OFF = 0; - public static final int WATCHDOG_MINIMUM_SECONDS = 30; + public static final int SILENT_OFF = 0; + public static final int SILENT_MINIMUM_SECONDS = 30; private Duration operationTimeout = null; private TransportType transportType = null; private RetryPolicy retryPolicy = null; private ProxyConfiguration proxyConfiguration = null; - private int watchdogTriggerSeconds = WATCHDOG_OFF; + private int maximumSilentTime = SILENT_OFF; /** * Create with all defaults @@ -100,25 +100,26 @@ public ProxyConfiguration getProxyConfiguration() { } /** - * Sets the watchdog trigger time in seconds. + * Sets the maximum silent time, in seconds. + * Use only on recommendation from the product group. * - * @param watchdogTriggerSeconds The time in seconds, or WATCHDOG_OFF. Time must be at least WATCHDOG_MINIMUM_SECONDS. + * @param maximumSilentTime The time in seconds, or SILENT_OFF. Time must be at least SILENT_MINIMUM_SECONDS. * @return The updated options object. */ - public EventHubClientOptions setWatchdogTriggerTime(int watchdogTriggerSeconds) { - if (watchdogTriggerSeconds < EventHubClientOptions.WATCHDOG_MINIMUM_SECONDS) { - throw new IllegalArgumentException("Watchdog trigger time must be at least " + EventHubClientOptions.WATCHDOG_MINIMUM_SECONDS + " seconds"); + public EventHubClientOptions setMaximumSilentTime(int maximumSilentTime) { + if (maximumSilentTime < EventHubClientOptions.SILENT_MINIMUM_SECONDS) { + throw new IllegalArgumentException("Maximum silent time must be at least " + EventHubClientOptions.SILENT_MINIMUM_SECONDS + " seconds"); } - this.watchdogTriggerSeconds = watchdogTriggerSeconds; + this.maximumSilentTime = maximumSilentTime; return this; } /** - * Gets the watchdog trigger time in seconds. + * Gets the maximum silent time in seconds. * - * @return The watchdog trigger time, or WATCHDOG_OFF. + * @return The maximum silent time, or SILENT_OFF. */ - public int getWatchdogTriggerTime() { - return this.watchdogTriggerSeconds; + public int getMaximumSilentTime() { + return this.maximumSilentTime; } } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 10087a0492a6..63ef83187088 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -114,7 +114,7 @@ public static CompletableFuture create( .setTransportType(options.getTransportType()) .setRetryPolicy(options.getRetryPolicy()) .setProxyConfiguration(options.getProxyConfiguration()) - .setWatchdogTriggerTime(options.getWatchdogTriggerTime()); + .setWatchdogTriggerTime(options.getMaximumSilentTime()); } return builder.build() From b551d08e9805807e91159a2ae17e5c88fae248ce Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 12 May 2020 16:15:56 -0700 Subject: [PATCH 11/18] Missed some WATCHDOG_OFF changes --- .../azure/eventhubs/impl/EventHubClientImpl.java | 2 +- .../azure/eventhubs/impl/MessagingFactory.java | 12 ++++++------ .../exceptioncontracts/MsgFactoryOpenCloseTest.java | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 63ef83187088..461849527a90 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -64,7 +64,7 @@ public static CompletableFuture create( final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration) throws IOException { - return create(connectionString, retryPolicy, executor, proxyConfiguration, EventHubClientOptions.WATCHDOG_OFF); + return create(connectionString, retryPolicy, executor, proxyConfiguration, EventHubClientOptions.SILENT_OFF); } public static CompletableFuture create( diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index 4157b2b87121..08d69b9b84fa 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -134,7 +134,7 @@ public static CompletableFuture createFromConnectionString( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration) throws IOException { - return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, EventHubClientOptions.WATCHDOG_OFF); + return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, EventHubClientOptions.SILENT_OFF); } public static CompletableFuture createFromConnectionString( @@ -188,7 +188,7 @@ public static class MessagingFactoryBuilder { private RetryPolicy retryPolicy = RetryPolicy.getDefault(); private ReactorFactory reactorFactory = new ReactorFactory(); private ProxyConfiguration proxyConfiguration; - private int watchdogTriggerSeconds = EventHubClientOptions.WATCHDOG_OFF; + private int watchdogTriggerSeconds = EventHubClientOptions.SILENT_OFF; public MessagingFactoryBuilder(final String hostname, final ITokenProvider tokenProvider, final ScheduledExecutorService executor) { if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -284,7 +284,7 @@ public void run() { } public void registerForWatchdog(final MessageReceiver rcvr) { - if (this.watchdogTriggerSeconds > EventHubClientOptions.WATCHDOG_OFF) { + if (this.watchdogTriggerSeconds > EventHubClientOptions.SILENT_OFF) { TRACE_LOGGER.info("Registering for watchdog: " + rcvr.getClientId()); synchronized (this.watchdogSyncObject) { this.watchdogReceivers.add(rcvr); @@ -294,7 +294,7 @@ public void registerForWatchdog(final MessageReceiver rcvr) { } public void unregisterForWatchdog(final MessageReceiver rcvr) { - if (this.watchdogTriggerSeconds > EventHubClientOptions.WATCHDOG_OFF) { + if (this.watchdogTriggerSeconds > EventHubClientOptions.SILENT_OFF) { TRACE_LOGGER.info("Unregistering for watchdog: " + rcvr.getClientId()); synchronized (this.watchdogSyncObject) { this.watchdogReceivers.remove(rcvr); @@ -303,7 +303,7 @@ public void unregisterForWatchdog(final MessageReceiver rcvr) { } private void startWatchdog() { - if (this.watchdogTriggerSeconds > EventHubClientOptions.WATCHDOG_OFF) { + if (this.watchdogTriggerSeconds > EventHubClientOptions.SILENT_OFF) { TRACE_LOGGER.info("Watchdog scheduling first run"); this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); } else { @@ -318,7 +318,7 @@ public void run() { if (MessagingFactory.this.getIsClosingOrClosed()) { return; } - if (MessagingFactory.this.watchdogTriggerSeconds <= EventHubClientOptions.WATCHDOG_OFF) { + if (MessagingFactory.this.watchdogTriggerSeconds <= EventHubClientOptions.SILENT_OFF) { TRACE_LOGGER.warn("Watchdog should not run when trigger time is " + MessagingFactory.this.watchdogTriggerSeconds + " -- stopping"); return; } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java index aa5de1f6d067..39269d961e3f 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java @@ -131,7 +131,7 @@ public void verifyThreadReleaseOnMsgFactoryOpenError() throws Exception { executor, networkOutageSimulator, null, - EventHubClientOptions.WATCHDOG_OFF); + EventHubClientOptions.SILENT_OFF); try { openFuture.get(); Assert.fail(); From 4f143e593b719ba45c99f2a9d71a4d53ce6e3b50 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Wed, 13 May 2020 13:33:33 -0700 Subject: [PATCH 12/18] Update version for release --- sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml | 4 ++-- sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml | 4 ++-- sdk/eventhubs/microsoft-azure-eventhubs/pom.xml | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml index 898747047ba1..e498d5e7fdba 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs-eph - 3.2.0-beta.1 + 3.2.0 Microsoft Azure SDK for Event Hubs Event Processor Host(EPH) EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer @@ -35,7 +35,7 @@ com.microsoft.azure azure-eventhubs - 3.2.0-beta.1 + 3.2.0 com.microsoft.azure diff --git a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml index 42d2699627fa..169c78893207 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs-extensions - 3.2.0-beta.1 + 3.2.0 Microsoft Azure SDK for Event Hubs Extensions Extensions built on Microsoft Azure Event Hubs @@ -35,7 +35,7 @@ com.microsoft.azure azure-eventhubs - 3.2.0-beta.1 + 3.2.0 org.apache.logging.log4j diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml index 7316375d177d..046333095214 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml +++ b/sdk/eventhubs/microsoft-azure-eventhubs/pom.xml @@ -14,7 +14,7 @@ 4.0.0 com.microsoft.azure azure-eventhubs - 3.2.0-beta.1 + 3.2.0 Microsoft Azure SDK for Event Hubs Libraries built on Microsoft Azure Event Hubs From 38af4b749897fc59dba0eb063d526936f0a6199d Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Wed, 13 May 2020 13:38:49 -0700 Subject: [PATCH 13/18] Remove unwanted file --- sdk/eventhubs/pom.track1only.xml | 15 --------------- 1 file changed, 15 deletions(-) delete mode 100644 sdk/eventhubs/pom.track1only.xml diff --git a/sdk/eventhubs/pom.track1only.xml b/sdk/eventhubs/pom.track1only.xml deleted file mode 100644 index f2e6201995e9..000000000000 --- a/sdk/eventhubs/pom.track1only.xml +++ /dev/null @@ -1,15 +0,0 @@ - - - 4.0.0 - com.azure - azure-eventhubs-service - pom - 1.0.0 - - microsoft-azure-eventhubs - microsoft-azure-eventhubs-eph - microsoft-azure-eventhubs-extensions - - From db4d9391bfe6e26ae19035c44d201bf9c5668f2d Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Tue, 19 May 2020 16:47:08 -0700 Subject: [PATCH 14/18] Change int seconds to Duration --- .../azure/eventhubs/EventHubClient.java | 7 ++-- .../eventhubs/EventHubClientOptions.java | 16 ++++----- .../eventhubs/impl/EventHubClientImpl.java | 4 +-- .../eventhubs/impl/MessagingFactory.java | 36 +++++++++---------- 4 files changed, 30 insertions(+), 33 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index 7658020050ac..8477c3e1aaec 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.net.URI; import java.nio.channels.UnresolvedAddressException; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; @@ -71,7 +72,7 @@ static EventHubClient createFromConnectionStringSync(final String connectionStri /** - * Synchronous version of {@link #createFromConnectionString(String, RetryPolicy, ScheduledExecutorService, ProxyConfiguration, int)}. + * Synchronous version of {@link #createFromConnectionString(String, RetryPolicy, ScheduledExecutorService, ProxyConfiguration, Duration)}. * * @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString. * @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub. @@ -86,7 +87,7 @@ static EventHubClient createFromConnectionStringSync(final String connectionStri static EventHubClient createFromConnectionStringSync(final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration configuration, - final int maximumSilentTime) + final Duration maximumSilentTime) throws EventHubException, IOException { return ExceptionUtil.syncWithIOException(() -> createFromConnectionString(connectionString, retryPolicy, executor, configuration, maximumSilentTime).get()); } @@ -166,7 +167,7 @@ static CompletableFuture createFromConnectionString( */ static CompletableFuture createFromConnectionString( final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor, - final ProxyConfiguration proxyConfiguration, final int maximumSilentTime) throws IOException { + final ProxyConfiguration proxyConfiguration, final Duration maximumSilentTime) throws IOException { return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration, maximumSilentTime); } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java index abad5c0ca312..f47e7381bdcd 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClientOptions.java @@ -10,14 +10,14 @@ * All options default to not specified (null) */ public class EventHubClientOptions { - public static final int SILENT_OFF = 0; - public static final int SILENT_MINIMUM_SECONDS = 30; + public static final Duration SILENT_OFF = Duration.ofSeconds(0); + public static final Duration SILENT_MINIMUM = Duration.ofSeconds(30); private Duration operationTimeout = null; private TransportType transportType = null; private RetryPolicy retryPolicy = null; private ProxyConfiguration proxyConfiguration = null; - private int maximumSilentTime = SILENT_OFF; + private Duration maximumSilentTime = SILENT_OFF; /** * Create with all defaults @@ -103,12 +103,12 @@ public ProxyConfiguration getProxyConfiguration() { * Sets the maximum silent time, in seconds. * Use only on recommendation from the product group. * - * @param maximumSilentTime The time in seconds, or SILENT_OFF. Time must be at least SILENT_MINIMUM_SECONDS. + * @param maximumSilentTime The time, or SILENT_OFF. Time must be at least SILENT_MINIMUM. * @return The updated options object. */ - public EventHubClientOptions setMaximumSilentTime(int maximumSilentTime) { - if (maximumSilentTime < EventHubClientOptions.SILENT_MINIMUM_SECONDS) { - throw new IllegalArgumentException("Maximum silent time must be at least " + EventHubClientOptions.SILENT_MINIMUM_SECONDS + " seconds"); + public EventHubClientOptions setMaximumSilentTime(Duration maximumSilentTime) { + if (maximumSilentTime.compareTo(EventHubClientOptions.SILENT_MINIMUM) < 0) { + throw new IllegalArgumentException("Maximum silent time must be at least " + EventHubClientOptions.SILENT_MINIMUM.toMillis() + " milliseconds"); } this.maximumSilentTime = maximumSilentTime; return this; @@ -119,7 +119,7 @@ public EventHubClientOptions setMaximumSilentTime(int maximumSilentTime) { * * @return The maximum silent time, or SILENT_OFF. */ - public int getMaximumSilentTime() { + public Duration getMaximumSilentTime() { return this.maximumSilentTime; } } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 461849527a90..d39fdc66955a 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -72,7 +72,7 @@ public static CompletableFuture create( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration, - final int watchdogTriggerSeconds) + final Duration watchdogTriggerTime) throws IOException { if (StringUtil.isNullOrWhiteSpace(connectionString)) { throw new IllegalArgumentException("Connection string cannot be null or empty"); @@ -82,7 +82,7 @@ public static CompletableFuture create( final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString); final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr.getEventHubName(), executor); - return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTriggerSeconds) + return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTriggerTime) .thenApplyAsync(new Function() { @Override public EventHubClient apply(MessagingFactory factory) { diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index 08d69b9b84fa..fdd0538a947f 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -67,7 +67,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti private static final long WATCHDOG_SCAN_DIVISOR = 2; private final LinkedList watchdogReceivers; private final Object watchdogSyncObject; - private final long watchdogTriggerSeconds; + private final Duration watchdogTriggerTime; private ScheduledFuture watchdogFuture; private final long watchdogScanSeconds; private boolean watchdogCleanupDone; @@ -92,7 +92,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti final ScheduledExecutorService executor, final ReactorFactory reactorFactory, final ProxyConfiguration proxyConfiguration, - final int watchdogTimeoutSeconds) { + final Duration watchdogTriggerTime) { super(StringUtil.getRandomString("MF"), null, executor); if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -117,8 +117,8 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti this.cbsChannelCreateLock = new Object(); this.mgmtChannelCreateLock = new Object(); - this.watchdogTriggerSeconds = watchdogTimeoutSeconds; - this.watchdogScanSeconds = watchdogTriggerSeconds / MessagingFactory.WATCHDOG_SCAN_DIVISOR; + this.watchdogTriggerTime = watchdogTriggerTime; + this.watchdogScanSeconds = watchdogTriggerTime.toMillis() / MessagingFactory.WATCHDOG_SCAN_DIVISOR / 1000; this.watchdogReceivers = new LinkedList(); this.watchdogSyncObject = new Object(); @@ -142,8 +142,8 @@ public static CompletableFuture createFromConnectionString( final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ProxyConfiguration proxyConfiguration, - final int watchdogTriggerSeconds) throws IOException { - return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, watchdogTriggerSeconds); + final Duration watchdogTriggerTime) throws IOException { + return createFromConnectionString(connectionString, retryPolicy, executor, null, proxyConfiguration, watchdogTriggerTime); } public static CompletableFuture createFromConnectionString( @@ -152,7 +152,7 @@ public static CompletableFuture createFromConnectionString( final ScheduledExecutorService executor, final ReactorFactory reactorFactory, final ProxyConfiguration proxyConfiguration, - final int watchdogTriggerSeconds) throws IOException { + final Duration watchdogTriggerTime) throws IOException { final ConnectionStringBuilder csb = new ConnectionStringBuilder(connectionString); ITokenProvider tokenProvider = null; if (!StringUtil.isNullOrWhiteSpace(csb.getSharedAccessSignature())) { @@ -171,7 +171,7 @@ public static CompletableFuture createFromConnectionString( .setRetryPolicy(retryPolicy) .setReactorFactory(reactorFactory) .setProxyConfiguration(proxyConfiguration) - .setWatchdogTriggerTime(watchdogTriggerSeconds); + .setWatchdogTriggerTime(watchdogTriggerTime); return builder.build(); } @@ -188,7 +188,7 @@ public static class MessagingFactoryBuilder { private RetryPolicy retryPolicy = RetryPolicy.getDefault(); private ReactorFactory reactorFactory = new ReactorFactory(); private ProxyConfiguration proxyConfiguration; - private int watchdogTriggerSeconds = EventHubClientOptions.SILENT_OFF; + private Duration watchdogTriggerTime = EventHubClientOptions.SILENT_OFF; public MessagingFactoryBuilder(final String hostname, final ITokenProvider tokenProvider, final ScheduledExecutorService executor) { if (StringUtil.isNullOrWhiteSpace(hostname)) { @@ -233,8 +233,8 @@ public MessagingFactoryBuilder setProxyConfiguration(ProxyConfiguration proxyCon return this; } - public MessagingFactoryBuilder setWatchdogTriggerTime(int watchdogTriggerSeconds) { - this.watchdogTriggerSeconds = watchdogTriggerSeconds; + public MessagingFactoryBuilder setWatchdogTriggerTime(Duration watchdogTriggerTime) { + this.watchdogTriggerTime = watchdogTriggerTime; return this; } @@ -247,7 +247,7 @@ public CompletableFuture build() throws IOException { this.executor, this.reactorFactory, this.proxyConfiguration, - this.watchdogTriggerSeconds); + this.watchdogTriggerTime); return MessagingFactory.factoryStartup(messagingFactory); } } @@ -284,7 +284,7 @@ public void run() { } public void registerForWatchdog(final MessageReceiver rcvr) { - if (this.watchdogTriggerSeconds > EventHubClientOptions.SILENT_OFF) { + if (this.watchdogTriggerTime.compareTo(EventHubClientOptions.SILENT_OFF) > 0) { TRACE_LOGGER.info("Registering for watchdog: " + rcvr.getClientId()); synchronized (this.watchdogSyncObject) { this.watchdogReceivers.add(rcvr); @@ -294,7 +294,7 @@ public void registerForWatchdog(final MessageReceiver rcvr) { } public void unregisterForWatchdog(final MessageReceiver rcvr) { - if (this.watchdogTriggerSeconds > EventHubClientOptions.SILENT_OFF) { + if (this.watchdogTriggerTime.compareTo(EventHubClientOptions.SILENT_OFF) > 0) { TRACE_LOGGER.info("Unregistering for watchdog: " + rcvr.getClientId()); synchronized (this.watchdogSyncObject) { this.watchdogReceivers.remove(rcvr); @@ -303,7 +303,7 @@ public void unregisterForWatchdog(final MessageReceiver rcvr) { } private void startWatchdog() { - if (this.watchdogTriggerSeconds > EventHubClientOptions.SILENT_OFF) { + if (this.watchdogTriggerTime.compareTo(EventHubClientOptions.SILENT_OFF) > 0) { TRACE_LOGGER.info("Watchdog scheduling first run"); this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); } else { @@ -318,10 +318,6 @@ public void run() { if (MessagingFactory.this.getIsClosingOrClosed()) { return; } - if (MessagingFactory.this.watchdogTriggerSeconds <= EventHubClientOptions.SILENT_OFF) { - TRACE_LOGGER.warn("Watchdog should not run when trigger time is " + MessagingFactory.this.watchdogTriggerSeconds + " -- stopping"); - return; - } LinkedList copiedList = null; synchronized (MessagingFactory.this.watchdogSyncObject) { @@ -330,7 +326,7 @@ public void run() { if (!copiedList.isEmpty()) { boolean anyReceiverIsAlive = false; final long longestAgoAllowable = Instant.now().getEpochSecond() - - MessagingFactory.this.watchdogTriggerSeconds; + - (MessagingFactory.this.watchdogTriggerTime.toMillis() / 1000); for (MessageReceiver rcvr : copiedList) { TRACE_LOGGER.debug("Watchdog checking receiver " + rcvr.getClientId() + " last: " From 9dbc3e2a816aa6775b03e18af90b41b10fedfa43 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Wed, 20 May 2020 15:14:15 -0700 Subject: [PATCH 15/18] Minor logging improvement --- .../com/microsoft/azure/eventhubs/impl/MessagingFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index fdd0538a947f..23c27053cbae 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/sdk/eventhubs/microsoft-azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -304,7 +304,7 @@ public void unregisterForWatchdog(final MessageReceiver rcvr) { private void startWatchdog() { if (this.watchdogTriggerTime.compareTo(EventHubClientOptions.SILENT_OFF) > 0) { - TRACE_LOGGER.info("Watchdog scheduling first run"); + TRACE_LOGGER.info("Watchdog scheduling first run in " + this.watchdogScanSeconds + " seconds"); this.watchdogFuture = this.executor.schedule(new WatchDog(), this.watchdogScanSeconds, TimeUnit.SECONDS); } else { TRACE_LOGGER.info("Watchdog is OFF"); From 63d597025ae96e4ed7f53e2d084a360eaf28b1dd Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Wed, 20 May 2020 15:20:48 -0700 Subject: [PATCH 16/18] Update versioning --- eng/versioning/version_data.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eng/versioning/version_data.txt b/eng/versioning/version_data.txt index 64953a34cd18..586ef92b542c 100644 --- a/eng/versioning/version_data.txt +++ b/eng/versioning/version_data.txt @@ -24,9 +24,9 @@ com.microsoft.azure.cognitiveservices:azure-cognitiveservices-customvision-predi com.microsoft.azure.cognitiveservices:azure-cognitiveservices-customvision-training;1.1.0-beta.3;1.1.0-beta.3 com.microsoft.azure.cognitiveservices:azure-cognitiveservices-faceapi;1.1.0-beta.1;1.1.0-beta.1 com.microsoft.azure.cognitiveservices:azure-cognitiveservices-qnamaker;1.0.0-beta;1.0.0-beta -com.microsoft.azure:azure-eventhubs;3.1.1;3.2.0-beta.1 -com.microsoft.azure:azure-eventhubs-eph;3.1.1;3.2.0-beta.1 -com.microsoft.azure:azure-eventhubs-extensions;3.1.1;3.2.0-beta.1 +com.microsoft.azure:azure-eventhubs;3.1.1;3.2.0 +com.microsoft.azure:azure-eventhubs-eph;3.1.1;3.2.0 +com.microsoft.azure:azure-eventhubs-extensions;3.1.1;3.2.0 com.microsoft.azure:azure-keyvault;1.2.4;1.3.0-beta.1 com.microsoft.azure:azure-keyvault-complete;1.2.4;1.3.0-beta.1 com.microsoft.azure:azure-keyvault-core;1.2.4;1.3.0-beta.1 From a58ed77590a7f28af2458d6488cf9cdb499f6973 Mon Sep 17 00:00:00 2001 From: James Birdsall Date: Thu, 21 May 2020 10:22:42 -0700 Subject: [PATCH 17/18] More versioning changes --- eng/spotbugs-aggregate-report/pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/eng/spotbugs-aggregate-report/pom.xml b/eng/spotbugs-aggregate-report/pom.xml index 3633f542a2ef..a60c9dff8963 100644 --- a/eng/spotbugs-aggregate-report/pom.xml +++ b/eng/spotbugs-aggregate-report/pom.xml @@ -117,17 +117,17 @@ com.microsoft.azure azure-eventhubs - 3.2.0-beta.1 + 3.2.0 com.microsoft.azure azure-eventhubs-eph - 3.2.0-beta.1 + 3.2.0 com.microsoft.azure azure-eventhubs-extensions - 3.2.0-beta.1 + 3.2.0