Skip to content

Commit

Permalink
Pull latest changes from azure-event-hubs-java (#3474)
Browse files Browse the repository at this point in the history
* Update Apache Proton-J dependency (0.29.0 --> 0.31.0) (#407)

* PartitionReceiver - add a method that provides an EventPosition which corresponds to an EventData returned last by the receiver (#408)

* Support IsPartitionEmpty property for PartitionRuntimeInformation (#399)

* Move setPrefetchCount API to the ReceiverOptions class from the PartitionReceiver and update the settings of Default & Max Prefetch count (#410)

This pull request includes two major changes related to Prefetch API.

1) Move setPrefetchCount API to the ReceiverOptions class so that prefetch value specified by a user can be used instead of using default value when communicating to the service during link open and initializing a receiver. This change also addresses the receiver stuck issue caused by setPrefetchAPI in a race condition.

2) Change the default value and set the upper bound of the prefetch count. Note that prefetch count should be greater than or equal to maxEventCount which can be set when either a) calling receive() API or b) implementing the getMaxEventCount API of the SessionReceiverHandler interface.

* Fixes several issues in the reactor related components (#411)

This pull request contains the following changes.

1) Finish pending tasks when recreating the reactor and make sure pending calls scheduled on the old reactor get complete.
2) Fix the session open timeout issue which can result in NPE in proton-J engine.
3) Make session open timeout configurable and use the value of OperationTimeout.
4) Update the message of exceptions and include an entity name in the exception message.
5) API change - use ScheduledExecutorService.
6) Improve tracing.

* Implement comparable on EventData (#395)

* Update receive/send link creation logic and improve tracing (#414)

* Prep for releasing client 2.0.0 and EPH 2.2.0 (#415)

* Ensure that links are closed when transport error occurrs (#417)

* ensure links are recreated on transport/connection failure
* update API document for EventProcessorOptions class
* add traces for link create/close case

* Prep for releasing client 2.1.0 and EPH 2.3.0 (#418)

* Update prefetch sendflow logic and increment version for new release (#420)

* Fix args for proxy auth call to Authenticator (#421)

* Prepare EPH 2.3.4 release (#423)

* Prepare EPH 2.4.0 release (#423) (#424)

* Handle proton:io errors with meaningful error msg (#427)

* Handle proton:io errors with meaningful error msg

* Use Proton-supplied message if present

* Minor changes to lease scanner (#428)

* Add logging if the scanner threw an exception.
* Change logging level to warn when scanner shuts down for any reason.
* Scanner can call EventProcessorOptions.notifyOfException, which calls user code. Change notifyOfException to defensively catch any exceptions coming out of user code.

* Make EventData.SystemProperties completely public (#435)

Porting testability changes from .NET Core to Java: provide full access to EventData's SystemProperties so that a complete EventData can be fabricated in tests.

* Digest Support: init first connection with null headers (#431)

Related to Azure/qpid-proton-j-extensions#10

* Fix lease scanner issues when Storage unreachable (#434)

This fix is for issue #432. There are two parts:

AzureStorageCheckpointLeaseManager performs certain Storage actions within a forEach. If those actions fail, the StorageException gets wrapped in a NoSuchElementException. Catch those and strip off the NoSuchElementException, then handle the StorageException in the existing way.

The unexpected NoSuchElementExceptions were not being caught anywhere and the scanner thread was dying without rescheduling itself. Added code in PartitionMananger.scan to catch any exceptions that leak out of PartitionScanner and reschedule the scanner unless the host instance is shutting down.

* message receiver - fix null pointer error and ensure that receive link is recreated upon a failure (#439)

* message receiver/sender - fix null pointer error and ensure that receive/send link is recreated on a failure.

* Update version numbers for release (#440)

* Update prefetch count for a receiver (#441)

* Fix an issue of creating multiple sessions for $management & $cbs channel for a single connection and improve logging (#443)

* Fix an issue of creating multiple sessions for $management & $cbs for a connection and improve logging

* Running through java files and double checking changes

* Fix casing on test names

* Ignore testcases that hang.

* Fix NullPointerException when there is no inner exception

* Move parent node to the top of the file.

* Update version numbers in spotbugs-reporting

* Increasing wait time until event hub scheduler is completed.
  • Loading branch information
conniey authored Apr 30, 2019
1 parent 3916131 commit 3b5a76c
Show file tree
Hide file tree
Showing 63 changed files with 515 additions and 396 deletions.
7 changes: 4 additions & 3 deletions eng/spotbugs-aggregate-report/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
<version>1.0.0</version>

<properties>
<azure-keyvault.version>1.2.0</azure-keyvault.version>
<azure-batch.version>5.0.1</azure-batch.version>
<azure-eventhubs.version>2.0.0</azure-eventhubs.version>
<azure-eventhubs.version>2.3.0</azure-eventhubs.version>
<azure-eventhubs-eph.version>2.5.0</azure-eventhubs-eph.version>
<azure-keyvault.version>1.2.0</azure-keyvault.version>
<azure-storage-blob.version>10.5.0</azure-storage-blob.version>
</properties>

Expand Down Expand Up @@ -67,7 +68,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>${azure-eventhubs.version}</version>
<version>${azure-eventhubs-eph.version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
Expand Down
8 changes: 4 additions & 4 deletions eventhubs/data-plane/ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ This library is available for use in Maven projects from the Maven Central Repos
following dependency declaration inside of your Maven project file:

```XML
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.0.0</version>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.3.0</version>
</dependency>
```

Expand Down
10 changes: 5 additions & 5 deletions eventhubs/data-plane/PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ This library is available for use in Maven projects from the Maven Central Repos
following dependency declaration inside of your Maven project file:

```XML
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.3.0</version>
</dependency>
```

For different types of build environments, the latest released JAR files can also be [explicitly obtained from the
Expand Down
16 changes: 8 additions & 8 deletions eventhubs/data-plane/azure-eventhubs-eph/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-eph</artifactId>
<version>2.2.0</version>
<version>2.5.0</version>

<name>Microsoft Azure SDK for Event Hubs Event Processor Host(EPH)</name>
<description>EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer</description>
<url>https://github.com/Azure/azure-sdk-for-java</url>

<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.0.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<distributionManagement>
<site>
<id>azure-java-build-docs</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Hashtable;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -331,10 +332,17 @@ public CompletableFuture<List<BaseLease>> getAllLeases() {
(bp.getLeaseState() == LeaseState.LEASED)));
});
future = CompletableFuture.completedFuture(infos);
} catch (URISyntaxException | StorageException e) {
} catch (URISyntaxException | StorageException | NoSuchElementException e) {
Throwable effective = e;
if (e instanceof NoSuchElementException) {
// If there is a StorageException in the forEach, it arrives wrapped in a NoSuchElementException.
// Strip the misleading NoSuchElementException to provide a meaningful error for the user.
effective = e.getCause();
}

TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e);
future = new CompletableFuture<List<BaseLease>>();
future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE));
future = new CompletableFuture<>();
future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE));
}

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import java.net.URISyntaxException;
import java.security.InvalidKeyException;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -23,7 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;

/***
* The main class of event processor host.
* The main class of event processor host.
*/
public final class EventProcessorHost {
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorHost.class);
Expand Down Expand Up @@ -268,15 +269,14 @@ public EventProcessorHost(
if (leaseManager == null) {
throw new IllegalArgumentException("Must provide an object which implements ILeaseManager");
}
// executorService argument is allowed to be null, that is the indication to use an internal threadpool.

// executorService argument is allowed to be null, that is the indication to use an internal threadpool.

// Normally will not be null because we're using the AzureStorage implementation.
// If it is null, we're using user-supplied implementation. Establish generic defaults
// in case the user doesn't provide an options object.
this.partitionManagerOptions = new PartitionManagerOptions();


if (executorService != null) {
// User has supplied an ExecutorService, so use that.
this.weOwnExecutor = false;
Expand Down Expand Up @@ -560,7 +560,7 @@ public Thread newThread(Runnable r) {
}

private String getNamePrefix() {
return String.format("[%s|%s|%s]-%s-",
return String.format(Locale.US, "[%s|%s|%s]-%s-",
this.entityName, this.consumerGroupName, this.hostName, POOL_NUMBER.getAndIncrement());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/***
* Options affecting the behavior of the event processor host instance in general.
*/
Expand All @@ -25,6 +28,8 @@ public final class EventProcessorOptions {
return EventPosition.fromStartOfStream();
};

private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorOptions.class);

public EventProcessorOptions() {
}

Expand Down Expand Up @@ -112,7 +117,7 @@ public int getPrefetchCount() {
/***
* Sets the prefetch count for the underlying event hub client.
*
* The default is 500. This controls how many events are received in advance.
* The default is 300. This controls how many events are received in advance.
*
* @param prefetchCount The new prefetch count.
*/
Expand Down Expand Up @@ -210,7 +215,11 @@ void notifyOfException(String hostname, Exception exception, String action, Stri
// Capture handler so it doesn't get set to null between test and use
Consumer<ExceptionReceivedEventArgs> handler = this.exceptionNotificationHandler;
if (handler != null) {
handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId));
try {
handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId));
} catch (Exception e) {
TRACE_LOGGER.error("host " + hostname + ": caught exception from user-provided exception notification handler", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import java.util.concurrent.ConcurrentHashMap;

/***
* An ILeaseManager implementation based on an in-memory store.
* An ILeaseManager implementation based on an in-memory store.
*
* THIS CLASS IS PROVIDED AS A CONVENIENCE FOR TESTING ONLY. All data stored via this class is in memory
* only and not persisted in any way. In addition, it is only visible within the same process: multiple
Expand Down Expand Up @@ -46,11 +46,11 @@ public InMemoryLeaseManager() {
public void initialize(HostContext hostContext) {
this.hostContext = hostContext;
}

public void setLatency(long milliseconds) {
this.millisecondsLatency = milliseconds;
}

private void latency(String caller) {
if (this.millisecondsLatency > 0) {
try {
Expand Down Expand Up @@ -91,7 +91,7 @@ public CompletableFuture<Void> deleteLeaseStore() {
latency("deleteLeaseStore");
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<CompleteLease> getLease(String partitionId) {
TRACE_LOGGER.debug(this.hostContext.withHost("getLease()"));
Expand All @@ -110,7 +110,7 @@ public CompletableFuture<List<BaseLease>> getAllLeases() {
latency("getAllLeasesStateInfo");
return CompletableFuture.completedFuture(infos);
}

@Override
public CompletableFuture<Void> createAllLeasesIfNotExists(List<String> partitionIds) {
ArrayList<CompletableFuture<BaseLease>> createFutures = new ArrayList<CompletableFuture<BaseLease>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,16 +283,21 @@ private Void scan(boolean isFirst) {
TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan"));
long start = System.currentTimeMillis();

(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
try {
(new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst)
.whenCompleteAsync((didSteal, e) -> {
TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start)));

if ((e != null) && !(e instanceof ClosingException)) {
TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e);
}

onPartitionCheckCompleteTestHook();

// Schedule the next scan unless we are shutting down.
if (!this.getIsClosingOrClosed()) {
int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds()
: this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
: this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
if (isFirst) {
seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds();
}
Expand All @@ -301,9 +306,19 @@ private Void scan(boolean isFirst) {
}
TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds));
} else {
TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown"));
}
}, this.hostContext.getExecutor());
} catch (Exception e) {
TRACE_LOGGER.error(this.hostContext.withHost("Lease scanner threw directly"), e);
if (!this.getIsClosingOrClosed()) {
int seconds = this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds();
synchronized (this.scanFutureSynchronizer) {
this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS);
}
TRACE_LOGGER.debug(this.hostContext.withHost("Forced schedule of lease scanner in " + seconds));
}
}

return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private CompletableFuture<Void> openClientsRetryWrapper() {
// trace exceptions from the final attempt, or ReceiverDisconnectedException.
return retryResult.handleAsync((r, e) -> {
if (e == null) {
// IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here,
// IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here,
// meaning it is safe to set the handler and start calling IEventProcessor.onEvents.
this.partitionReceiver.setReceiveHandler(this, this.hostContext.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ private CompletableFuture<Boolean> stealLeases(List<BaseLease> stealThese) {

return allSteals;
}

private static class AcquisitionHolder {
private CompleteLease acquiredLease;

Expand Down
14 changes: 7 additions & 7 deletions eventhubs/data-plane/azure-eventhubs-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-extensions</artifactId>
Expand All @@ -12,13 +19,6 @@
<description>Extensions built on Microsoft Azure Event Hubs</description>
<url>https://github.com/Azure/azure-sdk-for-java</url>

<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.0.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<distributionManagement>
<site>
<id>azure-java-build-docs</id>
Expand Down
14 changes: 7 additions & 7 deletions eventhubs/data-plane/azure-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.3.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<modelVersion>4.0.0</modelVersion>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
Expand All @@ -12,13 +19,6 @@
<description>Libraries built on Microsoft Azure Event Hubs</description>
<url>https://github.com/Azure/azure-sdk-for-java</url>

<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>2.0.0</version>
<relativePath>../pom.xml</relativePath>
</parent>

<distributionManagement>
<site>
<id>azure-java-build-docs</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ static EventData create(final ByteBuffer buffer) {
* @see SystemProperties#getEnqueuedTime
*/
SystemProperties getSystemProperties();
void setSystemProperties(SystemProperties props);

class SystemProperties extends HashMap<String, Object> {
private static final long serialVersionUID = -2827050124966993723L;
Expand All @@ -155,6 +156,13 @@ public SystemProperties(final HashMap<String, Object> map) {
super(Collections.unmodifiableMap(map));
}

public SystemProperties(final long sequenceNumber, final Instant enqueuedTimeUtc, final String offset, final String partitionKey) {
this.put(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, sequenceNumber);
this.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, new Date(enqueuedTimeUtc.toEpochMilli()));
this.put(AmqpConstants.OFFSET_ANNOTATION_NAME, offset);
this.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, partitionKey);
}

public String getOffset() {
return this.getSystemProperty(AmqpConstants.OFFSET_ANNOTATION_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public interface PartitionReceiver {

int MINIMUM_PREFETCH_COUNT = 1;
int DEFAULT_PREFETCH_COUNT = 500;
int MAXIMUM_PREFETCH_COUNT = 2000;
int MAXIMUM_PREFETCH_COUNT = 8000;

long NULL_EPOCH = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public String getIdentifier() {
* EventHubs service will throw {@link QuotaExceededException} and will include this identifier.
* So, its very critical to choose a value, which can uniquely identify the whereabouts of {@link PartitionReceiver}.
* <p>
* </p>
*
* @param value string to identify {@link PartitionReceiver}
*/
Expand Down
Loading

0 comments on commit 3b5a76c

Please sign in to comment.