Skip to content
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b711fac
More client-level watchdog changes
JamesBirdsall Mar 6, 2020
52de1af
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall Mar 25, 2020
343b2e7
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall Mar 26, 2020
3ade70b
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall Apr 15, 2020
55d9a84
Improve TODO comments
JamesBirdsall Apr 15, 2020
81371d7
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall Apr 16, 2020
9dfd9ce
Most changes from review
JamesBirdsall Apr 16, 2020
d1a4530
Add forced cleanup when needed
JamesBirdsall Apr 17, 2020
13cbae3
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall Apr 17, 2020
fb51a49
Fix version number
JamesBirdsall Apr 17, 2020
ecdd57f
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall Apr 21, 2020
e7f24af
Tracing improve and test fix
JamesBirdsall Apr 21, 2020
e63cce0
Put version number back
JamesBirdsall Apr 22, 2020
425470e
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall Apr 22, 2020
e0d90f2
Temporary fix until track2 build fixed
JamesBirdsall Apr 22, 2020
8414932
Make watchdog work with connection string
JamesBirdsall Apr 22, 2020
85a2827
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall Apr 22, 2020
ce78788
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall May 12, 2020
b594b64
Change watchdog in public API to silent time.
JamesBirdsall May 12, 2020
b551d08
Missed some WATCHDOG_OFF changes
JamesBirdsall May 12, 2020
4f143e5
Update version for release
JamesBirdsall May 13, 2020
5052d7a
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall May 13, 2020
38af4b7
Remove unwanted file
JamesBirdsall May 13, 2020
db4d939
Change int seconds to Duration
JamesBirdsall May 19, 2020
23a7cc6
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall May 19, 2020
9dbc3e2
Minor logging improvement
JamesBirdsall May 20, 2020
fbbe91e
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall May 20, 2020
63d5970
Update versioning
JamesBirdsall May 20, 2020
a58ed77
More versioning changes
JamesBirdsall May 21, 2020
e68f7f4
Merge branch 'master' of github.com:azure/azure-sdk-for-java into wat…
JamesBirdsall May 21, 2020
066a3ad
One last version update
JamesBirdsall May 21, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions eng/versioning/version_data.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ com.microsoft.azure.cognitiveservices:azure-cognitiveservices-customvision-predi
com.microsoft.azure.cognitiveservices:azure-cognitiveservices-customvision-training;1.1.0-beta.3;1.1.0-beta.3
com.microsoft.azure.cognitiveservices:azure-cognitiveservices-faceapi;1.1.0-beta.1;1.1.0-beta.1
com.microsoft.azure.cognitiveservices:azure-cognitiveservices-qnamaker;1.0.0-beta;1.0.0-beta
com.microsoft.azure:azure-eventhubs;3.1.1;3.2.0-beta.1
com.microsoft.azure:azure-eventhubs-eph;3.1.1;3.2.0-beta.1
com.microsoft.azure:azure-eventhubs-extensions;3.1.1;3.2.0-beta.1
com.microsoft.azure:azure-eventhubs;3.1.1;3.2.0
com.microsoft.azure:azure-eventhubs-eph;3.1.1;3.2.0
com.microsoft.azure:azure-eventhubs-extensions;3.1.1;3.2.0
com.microsoft.azure:azure-keyvault;1.2.4;1.3.0-beta.1
com.microsoft.azure:azure-keyvault-complete;1.2.4;1.3.0-beta.1
com.microsoft.azure:azure-keyvault-core;1.2.4;1.3.0-beta.1
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhubs/microsoft-azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>3.2.0-beta.1</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs-eph;current} -->
<version>3.2.0</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs-eph;current} -->
Comment thread
JamesBirdsall marked this conversation as resolved.

<name>Microsoft Azure SDK for Event Hubs Event Processor Host(EPH)</name>
<description>EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer</description>
Expand All @@ -35,7 +35,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>3.2.0-beta.1</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs;current} -->
<version>3.2.0</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs;current} -->
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhubs/microsoft-azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-extensions</artifactId>
<version>3.2.0-beta.1</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs-extensions;current} -->
<version>3.2.0</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs-extensions;current} -->

<name>Microsoft Azure SDK for Event Hubs Extensions</name>
<description>Extensions built on Microsoft Azure Event Hubs</description>
Expand All @@ -35,7 +35,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>3.2.0-beta.1</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs;current} -->
<version>3.2.0</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs;current} -->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhubs/microsoft-azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>3.2.0-beta.1</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs;current} -->
<version>3.2.0</version> <!-- {x-version-update;com.microsoft.azure:azure-eventhubs;current} -->

<name>Microsoft Azure SDK for Event Hubs</name>
<description>Libraries built on Microsoft Azure Event Hubs</description>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import java.io.IOException;
import java.net.URI;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

Expand Down Expand Up @@ -70,6 +71,27 @@ static EventHubClient createFromConnectionStringSync(final String connectionStri
}


/**
* Synchronous version of {@link #createFromConnectionString(String, RetryPolicy, ScheduledExecutorService, ProxyConfiguration, Duration)}.
*
* @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString.
* @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub.
* @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}.
* @param configuration The proxy configuration for this EventHubClient connection; {@code null} or
* {@link ProxyConfiguration#SYSTEM_DEFAULTS} if the system configured proxy settings should be used.
* @param maximumSilentTime Use {@link EventHubClientOptions#SILENT_OFF} except on recommendation from the product group.
* @return EventHubClient which can be used to create Senders and Receivers to EventHub
* @throws EventHubException If Service Bus service encountered problems during connection creation.
* @throws IOException If the underlying Proton-J layer encounter network errors.
*/
static EventHubClient createFromConnectionStringSync(final String connectionString, final RetryPolicy retryPolicy,
final ScheduledExecutorService executor,
final ProxyConfiguration configuration,
final Duration maximumSilentTime)
throws EventHubException, IOException {
return ExceptionUtil.syncWithIOException(() -> createFromConnectionString(connectionString, retryPolicy, executor, configuration, maximumSilentTime).get());
}

/**
* Factory method to create an instance of {@link EventHubClient} using the supplied connectionString.
* In a normal scenario (when re-direct is not enabled) - one EventHubClient instance maps to one Connection to the Azure ServiceBus EventHubs service.
Expand Down Expand Up @@ -126,6 +148,29 @@ static CompletableFuture<EventHubClient> createFromConnectionString(
return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration);
}

/**
* Factory method to create an instance of {@link EventHubClient} using the supplied {@code connectionString}. One
* EventHubClient instance maps to one connection to the Event Hubs service.
*
* <p>
* The {@link EventHubClient} created from this method creates a Sender instance internally, which is used by
* the {@link #send(EventData)} methods.
* </p>
* @param connectionString The connection string to be used. See {@link ConnectionStringBuilder} to construct a connectionString.
* @param retryPolicy A custom {@link RetryPolicy} to be used when communicating with EventHub.
* @param executor An {@link ScheduledExecutorService} to run all tasks performed by {@link EventHubClient}.
* @param proxyConfiguration The proxy configuration for this EventHubClient connection; {@code null} or
* {@link ProxyConfiguration#SYSTEM_DEFAULTS} if the system configured proxy settings should be used.
* @param maximumSilentTime Use {@link EventHubClientOptions#SILENT_OFF} except on recommendation from the product group.
* @return CompletableFuture{@literal <EventHubClient>} which can be used to create Senders and Receivers to EventHub.
* @throws IOException If the underlying Proton-J layer encounter network errors.
*/
static CompletableFuture<EventHubClient> createFromConnectionString(
final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor,
final ProxyConfiguration proxyConfiguration, final Duration maximumSilentTime) throws IOException {
return EventHubClientImpl.create(connectionString, retryPolicy, executor, proxyConfiguration, maximumSilentTime);
}

/**
* Factory method to create an instance of {@link EventHubClient} using the supplied namespace endpoint address, eventhub name and authentication mechanism.
* In a normal scenario (when re-direct is not enabled) - one EventHubClient instance maps to one Connection to the Azure ServiceBus EventHubs service.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@
* All options default to not specified (null)
*/
public class EventHubClientOptions {
public static final Duration SILENT_OFF = Duration.ofSeconds(0);
public static final Duration SILENT_MINIMUM = Duration.ofSeconds(30);

private Duration operationTimeout = null;
private TransportType transportType = null;
private RetryPolicy retryPolicy = null;
private ProxyConfiguration proxyConfiguration = null;
private Duration maximumSilentTime = SILENT_OFF;

/**
* Create with all defaults
Expand Down Expand Up @@ -92,6 +96,30 @@ public EventHubClientOptions setProxyConfiguration(ProxyConfiguration proxyConfi
* @return Gets the proxy configuration.
*/
public ProxyConfiguration getProxyConfiguration() {
return proxyConfiguration;
return this.proxyConfiguration;
}

/**
* Sets the maximum silent time, in seconds.
* Use only on recommendation from the product group.
*
* @param maximumSilentTime The time, or SILENT_OFF. Time must be at least SILENT_MINIMUM.
* @return The updated options object.
*/
public EventHubClientOptions setMaximumSilentTime(Duration maximumSilentTime) {
if (maximumSilentTime.compareTo(EventHubClientOptions.SILENT_MINIMUM) < 0) {
throw new IllegalArgumentException("Maximum silent time must be at least " + EventHubClientOptions.SILENT_MINIMUM.toMillis() + " milliseconds");
}
this.maximumSilentTime = maximumSilentTime;
return this;
}

/**
* Gets the maximum silent time in seconds.
*
* @return The maximum silent time, or SILENT_OFF.
*/
public Duration getMaximumSilentTime() {
return this.maximumSilentTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public final class ClientConstants {
public static final Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost");
public static final Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked");
public static final Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout");
public static final Symbol WATCHDOG_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":watchdog");
public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol(AmqpConstants.PROTON + ":io");
public static final Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id");
public static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,16 @@ public static CompletableFuture<EventHubClient> create(
final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor,
final ProxyConfiguration proxyConfiguration)
throws IOException {
return create(connectionString, retryPolicy, executor, proxyConfiguration, EventHubClientOptions.SILENT_OFF);
}

public static CompletableFuture<EventHubClient> create(
final String connectionString,
final RetryPolicy retryPolicy,
final ScheduledExecutorService executor,
final ProxyConfiguration proxyConfiguration,
final Duration watchdogTriggerTime)
throws IOException {
if (StringUtil.isNullOrWhiteSpace(connectionString)) {
throw new IllegalArgumentException("Connection string cannot be null or empty");
}
Expand All @@ -72,7 +82,7 @@ public static CompletableFuture<EventHubClient> create(
final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString);
final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr.getEventHubName(), executor);

return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration)
return MessagingFactory.createFromConnectionString(connectionString, retryPolicy, executor, proxyConfiguration, watchdogTriggerTime)
.thenApplyAsync(new Function<MessagingFactory, EventHubClient>() {
@Override
public EventHubClient apply(MessagingFactory factory) {
Expand Down Expand Up @@ -103,7 +113,8 @@ public static CompletableFuture<EventHubClient> create(
builder.setOperationTimeout(options.getOperationTimeout())
.setTransportType(options.getTransportType())
.setRetryPolicy(options.getRetryPolicy())
.setProxyConfiguration(options.getProxyConfiguration());
.setProxyConfiguration(options.getProxyConfiguration())
.setWatchdogTriggerTime(options.getMaximumSilentTime());
}

return builder.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver,
private volatile CompletableFuture<?> closeTimer;
private int prefetchCount;
private Exception lastKnownLinkError;
private volatile long lastDeliveryReceivedTime;
private String linkCreationTime; // Used when looking at Java dumps, do not remove.

private MessageReceiver(final MessagingFactory factory,
Expand Down Expand Up @@ -184,6 +185,11 @@ public void onError(Exception error) {
},
ClientConstants.TOKEN_REFRESH_INTERVAL,
this.underlyingFactory);

this.underlyingFactory.registerForWatchdog(this);
// Set last-received time to receiver creation time. This means that a newly-created receiver will get
// the watchdog time to receive the first message before it is declared to be failed.
this.lastDeliveryReceivedTime = Instant.now().getEpochSecond();
}

// @param connection Connection on which the MessageReceiver's receive AMQP link need to be created on.
Expand Down Expand Up @@ -310,6 +316,15 @@ public void onOpenComplete(Exception exception) {

this.underlyingFactory.getRetryPolicy().resetRetryCount(this.underlyingFactory.getClientId());

// Resetting the watchdog time here means that when the watchdog has forced a connection closed, the
// new connection gets the watchdog time to receive a first message before it is declared to be dead.
// Without this, the watchdog would continue to measure from the last message received and the new
// connection would only get the watchdog scan time, which is normally a lot shorter. Using the longer
// time avoids the situation where the network or the service is slow and the client gets trapped in
// a loop killing connections because it doesn't wait long enough for a response.
TRACE_LOGGER.info("Watchdog reset timer for new connection on receiver " + this.getClientId());
this.lastDeliveryReceivedTime = Instant.now().getEpochSecond();

this.nextCreditToFlow = 0;
this.sendFlow(this.prefetchCount - this.prefetchedMessages.size());

Expand Down Expand Up @@ -380,6 +395,10 @@ private void cancelOpenTimer() {
}
}

public long getLastReceivedTime() {
return this.lastDeliveryReceivedTime;
}

@Override
public void onReceiveComplete(Delivery delivery) {
int msgSize = delivery.pending();
Expand All @@ -393,6 +412,7 @@ public void onReceiveComplete(Delivery delivery) {
delivery.settle();

this.prefetchedMessages.add(message);
this.lastDeliveryReceivedTime = Instant.now().getEpochSecond();
this.underlyingFactory.getRetryPolicy().resetRetryCount(this.getClientId());

this.receiveWork.onEvent();
Expand Down Expand Up @@ -796,6 +816,8 @@ public ErrorContext getContext() {
protected CompletableFuture<Void> onClose() {
if (!this.getIsClosed()) {
try {
this.underlyingFactory.unregisterForWatchdog(this);

this.activeClientTokenManager.cancel();
scheduleLinkCloseTimeout(TimeoutTracker.create(operationTimeout, this.receiveLink.getName()));

Expand Down
Loading