From 3b5a76c57ec4cb1644b23490b81e46618517fe8c Mon Sep 17 00:00:00 2001 From: Connie Yau Date: Mon, 29 Apr 2019 23:16:24 -0700 Subject: [PATCH] Pull latest changes from azure-event-hubs-java (#3474) * Update Apache Proton-J dependency (0.29.0 --> 0.31.0) (#407) * PartitionReceiver - add a method that provides an EventPosition which corresponds to an EventData returned last by the receiver (#408) * Support IsPartitionEmpty property for PartitionRuntimeInformation (#399) * Move setPrefetchCount API to the ReceiverOptions class from the PartitionReceiver and update the settings of Default & Max Prefetch count (#410) This pull request includes two major changes related to Prefetch API. 1) Move setPrefetchCount API to the ReceiverOptions class so that prefetch value specified by a user can be used instead of using default value when communicating to the service during link open and initializing a receiver. This change also addresses the receiver stuck issue caused by setPrefetchAPI in a race condition. 2) Change the default value and set the upper bound of the prefetch count. Note that prefetch count should be greater than or equal to maxEventCount which can be set when either a) calling receive() API or b) implementing the getMaxEventCount API of the SessionReceiverHandler interface. * Fixes several issues in the reactor related components (#411) This pull request contains the following changes. 1) Finish pending tasks when recreating the reactor and make sure pending calls scheduled on the old reactor get complete. 2) Fix the session open timeout issue which can result in NPE in proton-J engine. 3) Make session open timeout configurable and use the value of OperationTimeout. 4) Update the message of exceptions and include an entity name in the exception message. 5) API change - use ScheduledExecutorService. 6) Improve tracing. * Implement comparable on EventData (#395) * Update receive/send link creation logic and improve tracing (#414) * Prep for releasing client 2.0.0 and EPH 2.2.0 (#415) * Ensure that links are closed when transport error occurrs (#417) * ensure links are recreated on transport/connection failure * update API document for EventProcessorOptions class * add traces for link create/close case * Prep for releasing client 2.1.0 and EPH 2.3.0 (#418) * Update prefetch sendflow logic and increment version for new release (#420) * Fix args for proxy auth call to Authenticator (#421) * Prepare EPH 2.3.4 release (#423) * Prepare EPH 2.4.0 release (#423) (#424) * Handle proton:io errors with meaningful error msg (#427) * Handle proton:io errors with meaningful error msg * Use Proton-supplied message if present * Minor changes to lease scanner (#428) * Add logging if the scanner threw an exception. * Change logging level to warn when scanner shuts down for any reason. * Scanner can call EventProcessorOptions.notifyOfException, which calls user code. Change notifyOfException to defensively catch any exceptions coming out of user code. * Make EventData.SystemProperties completely public (#435) Porting testability changes from .NET Core to Java: provide full access to EventData's SystemProperties so that a complete EventData can be fabricated in tests. * Digest Support: init first connection with null headers (#431) Related to https://github.com/Azure/qpid-proton-j-extensions/pull/10 * Fix lease scanner issues when Storage unreachable (#434) This fix is for issue #432. There are two parts: AzureStorageCheckpointLeaseManager performs certain Storage actions within a forEach. If those actions fail, the StorageException gets wrapped in a NoSuchElementException. Catch those and strip off the NoSuchElementException, then handle the StorageException in the existing way. The unexpected NoSuchElementExceptions were not being caught anywhere and the scanner thread was dying without rescheduling itself. Added code in PartitionMananger.scan to catch any exceptions that leak out of PartitionScanner and reschedule the scanner unless the host instance is shutting down. * message receiver - fix null pointer error and ensure that receive link is recreated upon a failure (#439) * message receiver/sender - fix null pointer error and ensure that receive/send link is recreated on a failure. * Update version numbers for release (#440) * Update prefetch count for a receiver (#441) * Fix an issue of creating multiple sessions for $management & $cbs channel for a single connection and improve logging (#443) * Fix an issue of creating multiple sessions for $management & $cbs for a connection and improve logging * Running through java files and double checking changes * Fix casing on test names * Ignore testcases that hang. * Fix NullPointerException when there is no inner exception * Move parent node to the top of the file. * Update version numbers in spotbugs-reporting * Increasing wait time until event hub scheduler is completed. --- eng/spotbugs-aggregate-report/pom.xml | 7 ++- eventhubs/data-plane/ConsumingEvents.md | 8 +-- eventhubs/data-plane/PublishingEvents.md | 10 +-- .../data-plane/azure-eventhubs-eph/pom.xml | 16 ++--- .../AzureStorageCheckpointLeaseManager.java | 14 ++++- .../EventProcessorHost.java | 8 +-- .../EventProcessorOptions.java | 13 +++- .../InMemoryLeaseManager.java | 10 +-- .../eventprocessorhost/PartitionManager.java | 21 ++++++- .../eventprocessorhost/PartitionPump.java | 2 +- .../eventprocessorhost/PartitionScanner.java | 2 +- .../azure-eventhubs-extensions/pom.xml | 14 ++--- eventhubs/data-plane/azure-eventhubs/pom.xml | 14 ++--- .../microsoft/azure/eventhubs/EventData.java | 8 +++ .../azure/eventhubs/PartitionReceiver.java | 2 +- .../azure/eventhubs/ReceiverOptions.java | 1 - .../impl/ActiveClientTokenManager.java | 9 ++- .../azure/eventhubs/impl/AmqpConstants.java | 1 + .../azure/eventhubs/impl/BaseLinkHandler.java | 43 +++++++------ .../azure/eventhubs/impl/CBSChannel.java | 5 +- .../azure/eventhubs/impl/ClientConstants.java | 8 ++- .../eventhubs/impl/ConnectionHandler.java | 61 ++++++++----------- .../azure/eventhubs/impl/CustomIOHandler.java | 10 ++- .../eventhubs/impl/EventDataBatchImpl.java | 3 +- .../azure/eventhubs/impl/EventDataImpl.java | 4 ++ .../azure/eventhubs/impl/EventDataUtil.java | 3 - .../eventhubs/impl/EventHubClientImpl.java | 11 ++-- .../eventhubs/impl/EventPositionImpl.java | 3 +- .../azure/eventhubs/impl/ExceptionUtil.java | 9 ++- .../eventhubs/impl/FaultTolerantObject.java | 4 +- .../eventhubs/impl/ManagementChannel.java | 53 ++++++++-------- .../azure/eventhubs/impl/MessageReceiver.java | 60 ++++++++++++------ .../azure/eventhubs/impl/MessageSender.java | 38 ++++++++---- .../eventhubs/impl/MessagingFactory.java | 55 +++++++++++------ .../eventhubs/impl/PartitionReceiverImpl.java | 7 ++- .../eventhubs/impl/PartitionSenderImpl.java | 5 +- .../azure/eventhubs/impl/ProtonUtil.java | 4 +- .../azure/eventhubs/impl/ReactorHandler.java | 14 +++-- .../eventhubs/impl/ReceiveLinkHandler.java | 34 +++++------ .../azure/eventhubs/impl/ReceivePump.java | 11 ++-- .../impl/RequestResponseChannel.java | 4 +- .../eventhubs/impl/RequestResponseCloser.java | 4 +- .../eventhubs/impl/RequestResponseOpener.java | 41 ++++++++++++- .../azure/eventhubs/impl/SendLinkHandler.java | 31 +++++----- .../azure/eventhubs/impl/SessionHandler.java | 54 ++++++++-------- .../azure/eventhubs/impl/StringUtil.java | 7 ++- .../azure/eventhubs/impl/TimeoutTracker.java | 1 - .../azure/eventhubs/impl/TrackingUtil.java | 24 ++------ .../impl/WebSocketConnectionHandler.java | 2 +- .../impl/WebSocketProxyConnectionHandler.java | 43 +------------ .../MsgFactoryOpenCloseTest.java | 13 ++-- .../exceptioncontracts/ReactorFaultTest.java | 4 +- .../SecurityExceptionsTest.java | 3 +- .../SendLargeMessageTest.java | 3 +- .../lib/FaultInjectingReactorFactory.java | 6 +- .../azure/eventhubs/lib/SasTokenTestBase.java | 3 +- .../azure/eventhubs/lib/TestContext.java | 1 - .../sendrecv/ReceiveParallelManualTest.java | 5 +- .../azure/eventhubs/sendrecv/ReceiveTest.java | 3 +- .../azure/eventhubs/sendrecv/SendTest.java | 7 ++- eventhubs/data-plane/pom.xml | 17 +++--- eventhubs/data-plane/readme.md | 22 +++---- pom.client.xml | 8 +-- 63 files changed, 515 insertions(+), 396 deletions(-) diff --git a/eng/spotbugs-aggregate-report/pom.xml b/eng/spotbugs-aggregate-report/pom.xml index ec012ce84e0d..dbc497d34c77 100644 --- a/eng/spotbugs-aggregate-report/pom.xml +++ b/eng/spotbugs-aggregate-report/pom.xml @@ -15,9 +15,10 @@ 1.0.0 - 1.2.0 5.0.1 - 2.0.0 + 2.3.0 + 2.5.0 + 1.2.0 10.5.0 @@ -67,7 +68,7 @@ com.microsoft.azure azure-eventhubs-eph - ${azure-eventhubs.version} + ${azure-eventhubs-eph.version} com.microsoft.azure diff --git a/eventhubs/data-plane/ConsumingEvents.md b/eventhubs/data-plane/ConsumingEvents.md index cff94be57c9e..450247efcb71 100644 --- a/eventhubs/data-plane/ConsumingEvents.md +++ b/eventhubs/data-plane/ConsumingEvents.md @@ -23,10 +23,10 @@ This library is available for use in Maven projects from the Maven Central Repos following dependency declaration inside of your Maven project file: ```XML - - com.microsoft.azure - azure-eventhubs - 2.0.0 + + com.microsoft.azure + azure-eventhubs + 2.3.0 ``` diff --git a/eventhubs/data-plane/PublishingEvents.md b/eventhubs/data-plane/PublishingEvents.md index 5a9eb428e531..c83007c0174b 100644 --- a/eventhubs/data-plane/PublishingEvents.md +++ b/eventhubs/data-plane/PublishingEvents.md @@ -9,11 +9,11 @@ This library is available for use in Maven projects from the Maven Central Repos following dependency declaration inside of your Maven project file: ```XML - - com.microsoft.azure - azure-eventhubs - 2.0.0 - + + com.microsoft.azure + azure-eventhubs + 2.3.0 + ``` For different types of build environments, the latest released JAR files can also be [explicitly obtained from the diff --git a/eventhubs/data-plane/azure-eventhubs-eph/pom.xml b/eventhubs/data-plane/azure-eventhubs-eph/pom.xml index 33d31c4e0ace..649dc2322e69 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/pom.xml +++ b/eventhubs/data-plane/azure-eventhubs-eph/pom.xml @@ -4,22 +4,22 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + com.microsoft.azure + azure-eventhubs-clients + 2.3.0 + ../pom.xml + + 4.0.0 com.microsoft.azure azure-eventhubs-eph - 2.2.0 + 2.5.0 Microsoft Azure SDK for Event Hubs Event Processor Host(EPH) EPH is built on top of the Azure Event Hubs Client and provides a number of features not present in that lower layer https://github.com/Azure/azure-sdk-for-java - - com.microsoft.azure - azure-eventhubs-clients - 2.0.0 - ../pom.xml - - azure-java-build-docs diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java index 91aa4a89486a..e44b6d2ccdf6 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/AzureStorageCheckpointLeaseManager.java @@ -34,6 +34,7 @@ import java.util.Hashtable; import java.util.Iterator; import java.util.List; +import java.util.NoSuchElementException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.regex.Matcher; @@ -331,10 +332,17 @@ public CompletableFuture> getAllLeases() { (bp.getLeaseState() == LeaseState.LEASED))); }); future = CompletableFuture.completedFuture(infos); - } catch (URISyntaxException | StorageException e) { + } catch (URISyntaxException | StorageException | NoSuchElementException e) { + Throwable effective = e; + if (e instanceof NoSuchElementException) { + // If there is a StorageException in the forEach, it arrives wrapped in a NoSuchElementException. + // Strip the misleading NoSuchElementException to provide a meaningful error for the user. + effective = e.getCause(); + } + TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e); - future = new CompletableFuture>(); - future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE)); + future = new CompletableFuture<>(); + future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE)); } return future; diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java index 3f52e162b227..c9eaf61e266d 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorHost.java @@ -11,6 +11,7 @@ import java.net.URISyntaxException; import java.security.InvalidKeyException; +import java.util.Locale; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -23,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger; /*** - * The main class of event processor host. + * The main class of event processor host. */ public final class EventProcessorHost { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorHost.class); @@ -268,15 +269,14 @@ public EventProcessorHost( if (leaseManager == null) { throw new IllegalArgumentException("Must provide an object which implements ILeaseManager"); } - // executorService argument is allowed to be null, that is the indication to use an internal threadpool. + // executorService argument is allowed to be null, that is the indication to use an internal threadpool. // Normally will not be null because we're using the AzureStorage implementation. // If it is null, we're using user-supplied implementation. Establish generic defaults // in case the user doesn't provide an options object. this.partitionManagerOptions = new PartitionManagerOptions(); - if (executorService != null) { // User has supplied an ExecutorService, so use that. this.weOwnExecutor = false; @@ -560,7 +560,7 @@ public Thread newThread(Runnable r) { } private String getNamePrefix() { - return String.format("[%s|%s|%s]-%s-", + return String.format(Locale.US, "[%s|%s|%s]-%s-", this.entityName, this.consumerGroupName, this.hostName, POOL_NUMBER.getAndIncrement()); } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java index 06637660e557..1b0bd2783dc0 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java @@ -11,6 +11,9 @@ import java.util.function.Consumer; import java.util.function.Function; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /*** * Options affecting the behavior of the event processor host instance in general. */ @@ -25,6 +28,8 @@ public final class EventProcessorOptions { return EventPosition.fromStartOfStream(); }; + private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorOptions.class); + public EventProcessorOptions() { } @@ -112,7 +117,7 @@ public int getPrefetchCount() { /*** * Sets the prefetch count for the underlying event hub client. * - * The default is 500. This controls how many events are received in advance. + * The default is 300. This controls how many events are received in advance. * * @param prefetchCount The new prefetch count. */ @@ -210,7 +215,11 @@ void notifyOfException(String hostname, Exception exception, String action, Stri // Capture handler so it doesn't get set to null between test and use Consumer handler = this.exceptionNotificationHandler; if (handler != null) { - handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId)); + try { + handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId)); + } catch (Exception e) { + TRACE_LOGGER.error("host " + hostname + ": caught exception from user-provided exception notification handler", e); + } } } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java index a2238c2bca9a..9b95d5457129 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/InMemoryLeaseManager.java @@ -14,7 +14,7 @@ import java.util.concurrent.ConcurrentHashMap; /*** - * An ILeaseManager implementation based on an in-memory store. + * An ILeaseManager implementation based on an in-memory store. * * THIS CLASS IS PROVIDED AS A CONVENIENCE FOR TESTING ONLY. All data stored via this class is in memory * only and not persisted in any way. In addition, it is only visible within the same process: multiple @@ -46,11 +46,11 @@ public InMemoryLeaseManager() { public void initialize(HostContext hostContext) { this.hostContext = hostContext; } - + public void setLatency(long milliseconds) { this.millisecondsLatency = milliseconds; } - + private void latency(String caller) { if (this.millisecondsLatency > 0) { try { @@ -91,7 +91,7 @@ public CompletableFuture deleteLeaseStore() { latency("deleteLeaseStore"); return CompletableFuture.completedFuture(null); } - + @Override public CompletableFuture getLease(String partitionId) { TRACE_LOGGER.debug(this.hostContext.withHost("getLease()")); @@ -110,7 +110,7 @@ public CompletableFuture> getAllLeases() { latency("getAllLeasesStateInfo"); return CompletableFuture.completedFuture(infos); } - + @Override public CompletableFuture createAllLeasesIfNotExists(List partitionIds) { ArrayList> createFutures = new ArrayList>(); diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java index b6ed86fe462d..8f419a35e6e2 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionManager.java @@ -283,16 +283,21 @@ private Void scan(boolean isFirst) { TRACE_LOGGER.debug(this.hostContext.withHost("Starting lease scan")); long start = System.currentTimeMillis(); - (new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst) + try { + (new PartitionScanner(this.hostContext, (lease) -> this.pumpManager.addPump(lease), this)).scan(isFirst) .whenCompleteAsync((didSteal, e) -> { TRACE_LOGGER.debug(this.hostContext.withHost("Scanning took " + (System.currentTimeMillis() - start))); + if ((e != null) && !(e instanceof ClosingException)) { + TRACE_LOGGER.warn(this.hostContext.withHost("Lease scanner got exception"), e); + } + onPartitionCheckCompleteTestHook(); // Schedule the next scan unless we are shutting down. if (!this.getIsClosingOrClosed()) { int seconds = didSteal ? this.hostContext.getPartitionManagerOptions().getFastScanIntervalInSeconds() - : this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds(); + : this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds(); if (isFirst) { seconds = this.hostContext.getPartitionManagerOptions().getStartupScanDelayInSeconds(); } @@ -301,9 +306,19 @@ private Void scan(boolean isFirst) { } TRACE_LOGGER.debug(this.hostContext.withHost("Scheduling lease scanner in " + seconds)); } else { - TRACE_LOGGER.debug(this.hostContext.withHost("Not scheduling lease scanner due to shutdown")); + TRACE_LOGGER.warn(this.hostContext.withHost("Not scheduling lease scanner due to shutdown")); } }, this.hostContext.getExecutor()); + } catch (Exception e) { + TRACE_LOGGER.error(this.hostContext.withHost("Lease scanner threw directly"), e); + if (!this.getIsClosingOrClosed()) { + int seconds = this.hostContext.getPartitionManagerOptions().getSlowScanIntervalInSeconds(); + synchronized (this.scanFutureSynchronizer) { + this.scanFuture = this.hostContext.getExecutor().schedule(() -> scan(false), seconds, TimeUnit.SECONDS); + } + TRACE_LOGGER.debug(this.hostContext.withHost("Forced schedule of lease scanner in " + seconds)); + } + } return null; } diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java index ef774f314eae..d2e16d1e3c09 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionPump.java @@ -135,7 +135,7 @@ private CompletableFuture openClientsRetryWrapper() { // trace exceptions from the final attempt, or ReceiverDisconnectedException. return retryResult.handleAsync((r, e) -> { if (e == null) { - // IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here, + // IEventProcessor.onOpen is called from the base PartitionPump and must have returned in order for execution to reach here, // meaning it is safe to set the handler and start calling IEventProcessor.onEvents. this.partitionReceiver.setReceiveHandler(this, this.hostContext.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout()); } else { diff --git a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java index 429798d6520e..bafa1af7b381 100644 --- a/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java +++ b/eventhubs/data-plane/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionScanner.java @@ -312,7 +312,7 @@ private CompletableFuture stealLeases(List stealThese) { return allSteals; } - + private static class AcquisitionHolder { private CompleteLease acquiredLease; diff --git a/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml b/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml index cb42b1e68178..45bbd35f6b58 100644 --- a/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml +++ b/eventhubs/data-plane/azure-eventhubs-extensions/pom.xml @@ -4,6 +4,13 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + com.microsoft.azure + azure-eventhubs-clients + 2.3.0 + ../pom.xml + + 4.0.0 com.microsoft.azure azure-eventhubs-extensions @@ -12,13 +19,6 @@ Extensions built on Microsoft Azure Event Hubs https://github.com/Azure/azure-sdk-for-java - - com.microsoft.azure - azure-eventhubs-clients - 2.0.0 - ../pom.xml - - azure-java-build-docs diff --git a/eventhubs/data-plane/azure-eventhubs/pom.xml b/eventhubs/data-plane/azure-eventhubs/pom.xml index 93bc7d05635b..c7a1e80f3f5a 100644 --- a/eventhubs/data-plane/azure-eventhubs/pom.xml +++ b/eventhubs/data-plane/azure-eventhubs/pom.xml @@ -4,6 +4,13 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + com.microsoft.azure + azure-eventhubs-clients + 2.3.0 + ../pom.xml + + 4.0.0 com.microsoft.azure azure-eventhubs @@ -12,13 +19,6 @@ Libraries built on Microsoft Azure Event Hubs https://github.com/Azure/azure-sdk-for-java - - com.microsoft.azure - azure-eventhubs-clients - 2.0.0 - ../pom.xml - - azure-java-build-docs diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java index c361076eb9e7..e2091f2977e6 100755 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java @@ -147,6 +147,7 @@ static EventData create(final ByteBuffer buffer) { * @see SystemProperties#getEnqueuedTime */ SystemProperties getSystemProperties(); + void setSystemProperties(SystemProperties props); class SystemProperties extends HashMap { private static final long serialVersionUID = -2827050124966993723L; @@ -155,6 +156,13 @@ public SystemProperties(final HashMap map) { super(Collections.unmodifiableMap(map)); } + public SystemProperties(final long sequenceNumber, final Instant enqueuedTimeUtc, final String offset, final String partitionKey) { + this.put(AmqpConstants.SEQUENCE_NUMBER_ANNOTATION_NAME, sequenceNumber); + this.put(AmqpConstants.ENQUEUED_TIME_UTC_ANNOTATION_NAME, new Date(enqueuedTimeUtc.toEpochMilli())); + this.put(AmqpConstants.OFFSET_ANNOTATION_NAME, offset); + this.put(AmqpConstants.PARTITION_KEY_ANNOTATION_NAME, partitionKey); + } + public String getOffset() { return this.getSystemProperty(AmqpConstants.OFFSET_ANNOTATION_NAME); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index b48a67f11fc4..4edbcf56f790 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -25,7 +25,7 @@ public interface PartitionReceiver { int MINIMUM_PREFETCH_COUNT = 1; int DEFAULT_PREFETCH_COUNT = 500; - int MAXIMUM_PREFETCH_COUNT = 2000; + int MAXIMUM_PREFETCH_COUNT = 8000; long NULL_EPOCH = 0; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java index 315927bf8e67..5dae17cc69f4 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java @@ -79,7 +79,6 @@ public String getIdentifier() { * EventHubs service will throw {@link QuotaExceededException} and will include this identifier. * So, its very critical to choose a value, which can uniquely identify the whereabouts of {@link PartitionReceiver}. *

- *

* * @param value string to identify {@link PartitionReceiver} */ diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java index a81285f71aeb..63a20902717f 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ActiveClientTokenManager.java @@ -40,6 +40,10 @@ final class ActiveClientTokenManager { } public void cancel() { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "clientEntity[%s] - canceling ActiveClientLinkManager", + clientEntity.getClientId())); + } synchronized (this.timerLock) { this.timer.cancel(false); @@ -61,9 +65,8 @@ public void run() { } else { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info( - String.format(Locale.US, - "clientEntity[%s] - closing ActiveClientLinkManager", clientEntity.getClientId())); + TRACE_LOGGER.info(String.format(Locale.US, "clientEntity[%s] - closing ActiveClientLinkManager", + clientEntity.getClientId())); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java index eac5f39c9caa..cad341a75721 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/AmqpConstants.java @@ -12,6 +12,7 @@ public final class AmqpConstants { public static final String APACHE = "apache.org"; + public static final String PROTON = "proton"; public static final String VENDOR = "com.microsoft"; public static final String AMQP_ANNOTATION_FORMAT = "amqp.annotation.%s >%s '%s'"; public static final String OFFSET_ANNOTATION_NAME = "x-opt-offset"; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java index 991c3b1cf9b2..45fe9885baca 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/BaseLinkHandler.java @@ -12,12 +12,16 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; + public class BaseLinkHandler extends BaseHandler { protected static final Logger TRACE_LOGGER = LoggerFactory.getLogger(BaseLinkHandler.class); + private final String name; private final AmqpLink underlyingEntity; - public BaseLinkHandler(final AmqpLink amqpLink) { + public BaseLinkHandler(final AmqpLink amqpLink, final String name) { + this.name = name; this.underlyingEntity = amqpLink; } @@ -27,10 +31,8 @@ public void onLinkLocalClose(Event event) { final ErrorCondition condition = link.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onLinkLocalClose linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkLocalClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } closeSession(link, link.getCondition()); @@ -39,13 +41,11 @@ public void onLinkLocalClose(Event event) { @Override public void onLinkRemoteClose(Event event) { final Link link = event.getLink(); - final ErrorCondition condition = link.getCondition(); + final ErrorCondition condition = link.getRemoteCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onLinkRemoteClose linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } handleRemoteLinkClosed(event); @@ -57,10 +57,8 @@ public void onLinkRemoteDetach(Event event) { final ErrorCondition condition = link.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onLinkRemoteDetach linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteDetach clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } handleRemoteLinkClosed(event); @@ -68,16 +66,19 @@ public void onLinkRemoteDetach(Event event) { public void processOnClose(Link link, ErrorCondition condition) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("processOnClose linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "processOnClose clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } this.underlyingEntity.onClose(condition); } public void processOnClose(Link link, Exception exception) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "processOnClose clientName[%s], linkName[%s], exception[%s]", + this.name, link.getName(), exception != null ? exception.getMessage() : "n/a")); + } + this.underlyingEntity.onError(exception); } @@ -86,10 +87,8 @@ private void closeSession(Link link, ErrorCondition condition) { if (session != null && session.getLocalState() != EndpointState.CLOSED) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("closeSession for linkName[%s], errorCondition[%s], errorDescription[%s]", - link.getName(), - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "closeSession for clientName[%s], linkName[%s], errorCondition[%s], errorDescription[%s]", + this.name, link.getName(), condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } session.setCondition(condition); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java index 27b870ddf454..4dcfcd6c5c38 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CBSChannel.java @@ -17,11 +17,12 @@ final class CBSChannel { CBSChannel( final SessionProvider sessionProvider, - final AmqpConnection connection) { + final AmqpConnection connection, + final String clientId) { RequestResponseCloser closer = new RequestResponseCloser(); this.innerChannel = new FaultTolerantObject<>( - new RequestResponseOpener(sessionProvider, "cbs-session", "cbs", ClientConstants.CBS_ADDRESS, connection), + new RequestResponseOpener(sessionProvider, clientId, "cbs-session", "cbs", ClientConstants.CBS_ADDRESS, connection), closer); closer.setInnerChannel(this.innerChannel); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java index 9d140815aef1..2fb6bee48ebe 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ClientConstants.java @@ -19,6 +19,7 @@ public final class ClientConstants { public static final Symbol STORE_LOCK_LOST_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":store-lock-lost"); public static final Symbol PUBLISHER_REVOKED_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":publisher-revoked"); public static final Symbol TIMEOUT_ERROR = Symbol.getSymbol(AmqpConstants.VENDOR + ":timeout"); + public static final Symbol PROTON_IO_ERROR = Symbol.getSymbol(AmqpConstants.PROTON + ":io"); public static final Symbol TRACKING_ID_PROPERTY = Symbol.getSymbol(AmqpConstants.VENDOR + ":tracking-id"); public static final int MAX_MESSAGE_LENGTH_BYTES = 256 * 1024; public static final int MAX_FRAME_SIZE_BYTES = 64 * 1024; @@ -26,7 +27,7 @@ public final class ClientConstants { public static final Duration TIMER_TOLERANCE = Duration.ofSeconds(1); public static final Duration DEFAULT_RETRY_MIN_BACKOFF = Duration.ofSeconds(0); public static final Duration DEFAULT_RETRY_MAX_BACKOFF = Duration.ofSeconds(30); - public static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(10); // renew every 10 mins, which expires 20 mins + public static final Duration TOKEN_REFRESH_INTERVAL = Duration.ofMinutes(5); // renew every 5 minutes, which expires 20 minutes public static final Duration TOKEN_VALIDITY = Duration.ofMinutes(20); public static final int DEFAULT_MAX_RETRY_COUNT = 10; public static final boolean DEFAULT_IS_TRANSIENT = true; @@ -36,7 +37,7 @@ public final class ClientConstants { public static final String NO_RETRY = "NoRetry"; public static final String DEFAULT_RETRY = "Default"; public static final String PRODUCT_NAME = "MSJavaClient"; - public static final String CURRENT_JAVACLIENT_VERSION = "2.0.0"; + public static final String CURRENT_JAVACLIENT_VERSION = "2.3.0"; public static final String PLATFORM_INFO = getPlatformInfo(); public static final String FRAMEWORK_INFO = getFrameworkInfo(); public static final String CBS_ADDRESS = "$cbs"; @@ -76,6 +77,9 @@ public final class ClientConstants { public static final String HTTPS_URI_FORMAT = "https://%s:%s"; public static final int MAX_RECEIVER_NAME_LENGTH = 64; + public static final String COMMUNICATION_EXCEPTION_GENERIC_MESSAGE = "A communication error has occurred. " + + "This may be due to an incorrect host name in your connection string or a problem with your network connection."; + /** * This is a constant defined to represent the start of a partition stream in EventHub. */ diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java index 445f0dc66f5e..1e8bf4c60685 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ConnectionHandler.java @@ -22,21 +22,21 @@ import java.util.Locale; import java.util.Map; -// ServiceBus <-> ProtonReactor interaction handles all -// amqp_connection/transport related events from reactor public class ConnectionHandler extends BaseHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class); private final AmqpConnection amqpConnection; + private final String connectionId; - protected ConnectionHandler(final AmqpConnection amqpConnection) { + protected ConnectionHandler(final AmqpConnection amqpConnection, final String connectionId) { add(new Handshaker()); this.amqpConnection = amqpConnection; + this.connectionId = connectionId; } - static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection) { + static ConnectionHandler create(TransportType transportType, AmqpConnection amqpConnection, String connectionId) { switch (transportType) { case AMQP_WEB_SOCKETS: if (WebSocketProxyConnectionHandler.shouldUseProxy(amqpConnection.getHostName())) { @@ -46,7 +46,7 @@ static ConnectionHandler create(TransportType transportType, AmqpConnection amqp } case AMQP: default: - return new ConnectionHandler(amqpConnection); + return new ConnectionHandler(amqpConnection, connectionId); } } @@ -67,17 +67,18 @@ protected AmqpConnection getAmqpConnection() { @Override public void onConnectionInit(Event event) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s]", this.amqpConnection.getHostName())); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionInit hostname[%s], connectionId[%s]", + this.amqpConnection.getHostName(), this.connectionId)); } final Connection connection = event.getConnection(); final String hostName = new StringBuilder(this.amqpConnection.getHostName()) .append(":") - .append(String.valueOf(this.getProtocolPort())) + .append(this.getProtocolPort()) .toString(); connection.setHostname(hostName); - connection.setContainer(StringUtil.getRandomString()); + connection.setContainer(this.connectionId); final Map connectionProperties = new HashMap<>(); connectionProperties.put(AmqpConstants.PRODUCT, ClientConstants.PRODUCT_NAME); @@ -141,7 +142,8 @@ protected int getMaxFrameSize() { @Override public void onConnectionBound(Event event) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s]", this.amqpConnection.getHostName())); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionBound hostname[%s], connectionId[%s]", + this.amqpConnection.getHostName(), this.connectionId)); } final Transport transport = event.getTransport(); @@ -154,8 +156,8 @@ public void onConnectionUnbound(Event event) { final Connection connection = event.getConnection(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound: hostname[%s], state[%s], remoteState[%s]", - connection.getHostname(), connection.getLocalState(), connection.getRemoteState())); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionUnbound hostname[%s], connectionId[%s], state[%s], remoteState[%s]", + connection.getHostname(), this.connectionId, connection.getLocalState(), connection.getRemoteState())); } // if failure happened while establishing transport - nothing to free up. @@ -172,9 +174,8 @@ public void onTransportError(Event event) { final ErrorCondition condition = transport.getCondition(); if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError: hostname[%s], error[%s]", - connection != null ? connection.getHostname() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.warn(String.format(Locale.US, "onTransportError hostname[%s], connectionId[%s], error[%s]", + connection != null ? connection.getHostname() : "n/a", this.connectionId, condition != null ? condition.getDescription() : "n/a")); } if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { @@ -197,8 +198,8 @@ public void onTransportClosed(Event event) { final ErrorCondition condition = transport.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed: hostname[%s], error[%s]", - connection != null ? connection.getHostname() : "n/a", (condition != null ? condition.getDescription() : "n/a"))); + TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s], connectionId[%s], error[%s]", + connection != null ? connection.getHostname() : "n/a", this.connectionId, (condition != null ? condition.getDescription() : "n/a"))); } if (connection != null && connection.getRemoteState() != EndpointState.CLOSED) { @@ -214,10 +215,8 @@ public void onConnectionLocalOpen(Event event) { final ErrorCondition error = connection.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen: hostname[%s], errorCondition[%s], errorDescription[%s]", - connection.getHostname(), - error != null ? error.getCondition() : "n/a", - error != null ? error.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalOpen hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]", + connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a")); } } @@ -225,8 +224,8 @@ public void onConnectionLocalOpen(Event event) { public void onConnectionRemoteOpen(Event event) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen: hostname[%s], remoteContainer[%s]", - event.getConnection().getHostname(), event.getConnection().getRemoteContainer())); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteOpen hostname[%s], connectionId[%s], remoteContainer[%s]", + event.getConnection().getHostname(), this.connectionId, event.getConnection().getRemoteContainer())); } this.amqpConnection.onOpenComplete(null); @@ -239,10 +238,8 @@ public void onConnectionLocalClose(Event event) { final ErrorCondition error = connection.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose: hostname[%s], errorCondition[%s], errorDescription[%s]", - connection.getHostname(), - error != null ? error.getCondition() : "n/a", - error != null ? error.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionLocalClose hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]", + connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a")); } if (connection.getRemoteState() == EndpointState.CLOSED) { @@ -261,10 +258,8 @@ public void onConnectionRemoteClose(Event event) { final ErrorCondition error = connection.getRemoteCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose: hostname[%s], errorCondition[%s], errorDescription[%s]", - connection.getHostname(), - error != null ? error.getCondition() : "n/a", - error != null ? error.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionRemoteClose hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]", + connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a")); } this.amqpConnection.onConnectionError(error); @@ -276,10 +271,8 @@ public void onConnectionFinal(Event event) { final ErrorCondition error = connection.getCondition(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal: hostname[%s], errorCondition[%s], errorDescription[%s]", - connection.getHostname(), - error != null ? error.getCondition() : "n/a", - error != null ? error.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onConnectionFinal hostname[%s], connectionId[%s], errorCondition[%s], errorDescription[%s]", + connection.getHostname(), this.connectionId, error != null ? error.getCondition() : "n/a", error != null ? error.getDescription() : "n/a")); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java index 2050bc5873d6..9169eb10b478 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/CustomIOHandler.java @@ -16,14 +16,20 @@ public class CustomIOHandler extends IOHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(CustomIOHandler.class); + private final String name; + + public CustomIOHandler(final String name) { + this.name = name; + } + @Override public void onTransportClosed(Event event) { final Transport transport = event.getTransport(); final Connection connection = event.getConnection(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed hostname[%s]", - (connection != null ? connection.getHostname() : "n/a"))); + TRACE_LOGGER.info(String.format(Locale.US, "onTransportClosed name[%s], hostname[%s]", + this.name, (connection != null ? connection.getHostname() : "n/a"))); } if (transport != null && connection != null && connection.getTransport() != null) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java index 569792a4f5cd..54207e7e18a8 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataBatchImpl.java @@ -11,6 +11,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Locale; final class EventDataBatchImpl implements EventDataBatch { @@ -45,7 +46,7 @@ public boolean tryAdd(final EventData eventData) throws PayloadSizeExceededExcep try { size = getSize(eventDataImpl, events.isEmpty()); } catch (java.nio.BufferOverflowException exception) { - throw new PayloadSizeExceededException(String.format("Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024)); + throw new PayloadSizeExceededException(String.format(Locale.US, "Size of the payload exceeded Maximum message size: %s kb", this.maxMessageSize / 1024)); } if (this.currentSize + size > this.maxMessageSize) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java index 12b4a9848404..b4294435f7e0 100755 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataImpl.java @@ -165,6 +165,10 @@ public SystemProperties getSystemProperties() { return this.systemProperties; } + public void setSystemProperties(EventData.SystemProperties props) { + this.systemProperties = props; + } + // This is intended to be used while sending EventData - so EventData.SystemProperties will not be copied over to the AmqpMessage Message toAmqpMessage() { final Message amqpMessage = Proton.message(); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java index dba9d1e99974..d8e1c3588e54 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventDataUtil.java @@ -14,9 +14,6 @@ import java.util.Set; import java.util.function.Consumer; -/* - * Internal utility class for EventData - */ final class EventDataUtil { @SuppressWarnings("serial") diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java index 5f3d339e35d2..a352172bb1dc 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventHubClientImpl.java @@ -50,15 +50,15 @@ public final class EventHubClientImpl extends ClientEntity implements EventHubCl private CompletableFuture createSender; private EventHubClientImpl(final ConnectionStringBuilder connectionString, final ScheduledExecutorService executor) { - super("EventHubClientImpl".concat(StringUtil.getRandomString()), null, executor); + super(StringUtil.getRandomString("EC"), null, executor); this.eventHubName = connectionString.getEventHubName(); this.senderCreateSync = new Object(); } public static CompletableFuture create( - final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor) - throws EventHubException, IOException { + final String connectionString, final RetryPolicy retryPolicy, final ScheduledExecutorService executor) + throws IOException { final ConnectionStringBuilder connStr = new ConnectionStringBuilder(connectionString); final EventHubClientImpl eventHubClient = new EventHubClientImpl(connStr, executor); @@ -232,7 +232,8 @@ private CompletableFuture createInternalSender() { if (!this.isSenderCreateStarted) { synchronized (this.senderCreateSync) { if (!this.isSenderCreateStarted) { - this.createSender = MessageSender.create(this.underlyingFactory, this.getClientId().concat("-InternalSender"), this.eventHubName) + String senderName = StringUtil.getRandomString("EC").concat(StringUtil.SEPARATOR + this.underlyingFactory.getClientId()).concat("-InternalSender"); + this.createSender = MessageSender.create(this.underlyingFactory, senderName, this.eventHubName) .thenAcceptAsync(new Consumer() { public void accept(MessageSender a) { EventHubClientImpl.this.sender = a; @@ -314,7 +315,7 @@ public CompletableFuture apply(Map private CompletableFuture addManagementToken(Map request) { CompletableFuture retval = null; try { - String audience = String.format("amqp://%s/%s", this.underlyingFactory.getHostName(), this.eventHubName); + String audience = String.format(Locale.US, "amqp://%s/%s", this.underlyingFactory.getHostName(), this.eventHubName); String token = this.underlyingFactory.getTokenProvider().getToken(audience, ClientConstants.TOKEN_REFRESH_INTERVAL); request.put(ClientConstants.MANAGEMENT_SECURITY_TOKEN_KEY, token); } catch (InvalidKeyException | NoSuchAlgorithmException | IOException e) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java index dbbb4ab5fe64..80f2cec4a44c 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/EventPositionImpl.java @@ -8,6 +8,7 @@ import org.slf4j.LoggerFactory; import java.time.Instant; +import java.util.Locale; public final class EventPositionImpl implements EventPosition { @@ -103,7 +104,7 @@ String getExpression() { @Override public String toString() { - return String.format("offset[%s], sequenceNumber[%s], enqueuedTime[%s], inclusiveFlag[%s]", + return String.format(Locale.US, "offset[%s], sequenceNumber[%s], enqueuedTime[%s], inclusiveFlag[%s]", this.offset, this.sequenceNumber, (this.dateTime != null) ? this.dateTime.toEpochMilli() : "null", this.inclusiveFlag); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java index da5a59b8e4a1..621c582d2af3 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ExceptionUtil.java @@ -4,6 +4,7 @@ package com.microsoft.azure.eventhubs.impl; import com.microsoft.azure.eventhubs.AuthorizationFailedException; +import com.microsoft.azure.eventhubs.CommunicationException; import com.microsoft.azure.eventhubs.ErrorContext; import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.IllegalEntityException; @@ -62,6 +63,12 @@ static Exception toException(ErrorCondition errorCondition) { return new EventHubException(true, new AmqpException(errorCondition)); } else if (errorCondition.getCondition() == AmqpErrorCode.ResourceLimitExceeded) { return new QuotaExceededException(new AmqpException(errorCondition)); + } else if (errorCondition.getCondition() == ClientConstants.PROTON_IO_ERROR) { + String message = ClientConstants.COMMUNICATION_EXCEPTION_GENERIC_MESSAGE; + if (errorCondition.getDescription() != null) { + message = errorCondition.getDescription(); + } + return new CommunicationException(message, null); } return new EventHubException(ClientConstants.DEFAULT_IS_TRANSIENT, errorCondition.getDescription()); @@ -137,7 +144,7 @@ public static String toStackTraceString(final Throwable exception, final String final Throwable innerException = exception.getCause(); if (innerException != null) { - builder.append("Cause: " + innerException.getMessage()); + builder.append("Cause: ").append(innerException.getMessage()); final StackTraceElement[] innerStackTraceElements = innerException.getStackTrace(); for (final StackTraceElement ste : innerStackTraceElements) { builder.append(System.lineSeparator()); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java index 4ae153061003..01e9006950c0 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/FaultTolerantObject.java @@ -47,7 +47,7 @@ public void runOnOpenedObject( public void onEvent() { if (!creatingNewInnerObject && (innerObject == null || innerObject.getState() == IOObject.IOObjectState.CLOSED - || innerObject.getState() == IOObject.IOObjectState.CLOSING)) { + || innerObject.getState() == IOObject.IOObjectState.CLOSING)) { creatingNewInnerObject = true; try { @@ -59,6 +59,7 @@ public void onComplete(T result) { for (OperationResult callback : openCallbacks) { callback.onComplete(result); } + openCallbacks.clear(); } @@ -67,6 +68,7 @@ public void onError(Exception error) { for (OperationResult callback : openCallbacks) { callback.onError(error); } + openCallbacks.clear(); } }); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java index 7e74034d319e..b6f577165d58 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ManagementChannel.java @@ -3,33 +3,34 @@ package com.microsoft.azure.eventhubs.impl; +import com.microsoft.azure.eventhubs.OperationCancelledException; +import com.microsoft.azure.eventhubs.TimeoutException; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.message.Message; import java.io.IOException; +import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; -import com.microsoft.azure.eventhubs.OperationCancelledException; -import com.microsoft.azure.eventhubs.TimeoutException; - final class ManagementChannel { final FaultTolerantObject innerChannel; - ManagementChannel(final SessionProvider sessionProvider, final AmqpConnection connection) { + ManagementChannel(final SessionProvider sessionProvider, final AmqpConnection connection, final String clientId) { final RequestResponseCloser closer = new RequestResponseCloser(); this.innerChannel = new FaultTolerantObject<>( - new RequestResponseOpener( - sessionProvider, - "mgmt-session", - "mgmt", - ClientConstants.MANAGEMENT_ADDRESS, - connection), - closer); + new RequestResponseOpener( + sessionProvider, + clientId, + "mgmt-session", + "mgmt", + ClientConstants.MANAGEMENT_ADDRESS, + connection), + closer); closer.setInnerChannel(this.innerChannel); } @@ -45,22 +46,22 @@ public CompletableFuture> request( try { // schedule client-timeout on the request dispatcher.invoke((int) timeoutInMillis, - new DispatchHandler() { - @Override - public void onEvent() { - final RequestResponseChannel channel = innerChannel.unsafeGetIfOpened(); - final String errorMessage; - if (channel != null && channel.getState() == IOObject.IOObjectState.OPENED) { - final String remoteContainerId = channel.getSendLink().getSession().getConnection().getRemoteContainer(); - errorMessage = String.format("Management request timed out (%sms), after not receiving response from service. TrackingId: %s", - timeoutInMillis, StringUtil.isNullOrEmpty(remoteContainerId) ? "n/a" : remoteContainerId); - } else { - errorMessage = "Management request timed out on the client - enable info level tracing to diagnose."; + new DispatchHandler() { + @Override + public void onEvent() { + final RequestResponseChannel channel = innerChannel.unsafeGetIfOpened(); + final String errorMessage; + if (channel != null && channel.getState() == IOObject.IOObjectState.OPENED) { + final String remoteContainerId = channel.getSendLink().getSession().getConnection().getRemoteContainer(); + errorMessage = String.format(Locale.US, "Management request timed out (%sms), after not receiving response from service. TrackingId: %s", + timeoutInMillis, StringUtil.isNullOrEmpty(remoteContainerId) ? "n/a" : remoteContainerId); + } else { + errorMessage = "Management request timed out on the client - enable info level tracing to diagnose."; + } + + resultFuture.completeExceptionally(new TimeoutException(errorMessage)); } - - resultFuture.completeExceptionally(new TimeoutException(errorMessage)); - } - }); + }); } catch (final IOException ioException) { resultFuture.completeExceptionally( new OperationCancelledException( diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java index 3a780267dba9..655709389517 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageReceiver.java @@ -77,6 +77,7 @@ public final class MessageReceiver extends ClientEntity implements AmqpReceiver, private volatile CompletableFuture closeTimer; private int prefetchCount; private Exception lastKnownLinkError; + private String linkCreationTime; private MessageReceiver(final MessagingFactory factory, final String name, @@ -124,7 +125,7 @@ public void run() { "clientId[%s], path[%s], linkName[%s] - Reschedule operation timer, current: [%s], remaining: [%s] secs", getClientId(), receivePath, - receiveLink.getName(), + getReceiveLinkName(), Instant.now(), timeoutTracker.remaining().getSeconds())); } @@ -160,7 +161,7 @@ public void onComplete(Void result) { TRACE_LOGGER.debug( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - token renewed", - getClientId(), receivePath, receiveLink.getName())); + getClientId(), receivePath, getReceiveLinkName())); } } @@ -170,7 +171,7 @@ public void onError(Exception error) { TRACE_LOGGER.info( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], tokenRenewalFailure[%s]", - getClientId(), receivePath, receiveLink.getName(), error.getMessage())); + getClientId(), receivePath, getReceiveLinkName(), error.getMessage())); } } }); @@ -179,7 +180,7 @@ public void onError(Exception error) { TRACE_LOGGER.info( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], tokenRenewalScheduleFailure[%s]", - getClientId(), receivePath, receiveLink.getName(), exception.getMessage())); + getClientId(), receivePath, getReceiveLinkName(), exception.getMessage())); } } } @@ -242,6 +243,10 @@ private List receiveCore(final int messageCount) { return returnMessages; } + private String getReceiveLinkName() { + return this.receiveLink == null ? "null" : this.receiveLink.getName(); + } + public Duration getReceiveTimeout() { return this.receiveTimeout; } @@ -269,7 +274,7 @@ public CompletableFuture> receive(final int maxMessageCount) "clientId[%s], path[%s], linkName[%s] - schedule operation timer, current: [%s], remaining: [%s] secs", this.getClientId(), this.receivePath, - this.receiveLink.getName(), + this.getReceiveLinkName(), Instant.now(), this.receiveTimeout.getSeconds())); } @@ -313,8 +318,8 @@ public void onOpenComplete(Exception exception) { this.sendFlow(this.prefetchCount - this.prefetchedMessages.size()); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]", - this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), this.prefetchCount)); + TRACE_LOGGER.info(String.format(Locale.US, "onOpenComplete - clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s]", + this.getClientId(), this.receivePath, this.getReceiveLinkName(), this.receiveLink.getCredit(), this.prefetchCount)); } } else { synchronized (this.errorConditionLock) { @@ -409,11 +414,12 @@ public void onError(final Exception exception) { : exception; if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn("clientId[{}], receiverPath[{}], linkName[{}], onError: {}", + TRACE_LOGGER.warn( + String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], onError: %s", this.getClientId(), this.receivePath, - this.receiveLink != null ? this.receiveLink.getName() : "n/a", - completionException); + this.getReceiveLinkName(), + completionException)); } this.onOpenComplete(completionException); @@ -430,7 +436,7 @@ public void onError(final Exception exception) { @Override public void onEvent() { if (!MessageReceiver.this.getIsClosingOrClosed() - && (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) { + && (receiveLink == null || receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) { createReceiveLink(); underlyingFactory.getRetryPolicy().incrementRetryCount(getClientId()); } @@ -443,7 +449,7 @@ public void onEvent() { String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], scheduling createLink encountered error: %s", this.getClientId(), this.receivePath, - this.receiveLink.getName(), ignore.getLocalizedMessage())); + this.getReceiveLinkName(), ignore.getLocalizedMessage())); } } } @@ -478,6 +484,13 @@ private void scheduleOperationTimer(final TimeoutTracker tracker) { private void createReceiveLink() { synchronized (this.errorConditionLock) { if (this.creatingLink) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info( + String.format(Locale.US, + "clientId[%s], path[%s], operationTimeout[%s], creating a receive link is already in progress", + this.getClientId(), this.receivePath, this.operationTimeout)); + } + return; } @@ -491,6 +504,8 @@ private void createReceiveLink() { this.getClientId(), this.receivePath, this.operationTimeout)); } + this.linkCreationTime = Instant.now().toString(); + this.scheduleLinkOpenTimeout(TimeoutTracker.create(this.operationTimeout)); final Consumer onSessionOpen = new Consumer() { @@ -498,6 +513,12 @@ private void createReceiveLink() { public void accept(Session session) { // if the MessageReceiver is closed - we no-longer need to create the link if (MessageReceiver.this.getIsClosingOrClosed()) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info( + String.format(Locale.US, + "clientId[%s], path[%s], canceling the job of creating a receive link because the receiver was closed", + getClientId(), receivePath)); + } session.close(); return; @@ -529,7 +550,7 @@ public void accept(Session session) { if (desiredCapabilities != null) { receiver.setDesiredCapabilities(desiredCapabilities); } - final ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this); + final ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this, MessageReceiver.this.getClientId()); BaseHandler.setHandler(receiver, handler); if (MessageReceiver.this.receiveLink != null) { @@ -609,18 +630,23 @@ private Message pollPrefetchQueue() { private void sendFlow(final int credits) { // slow down sending the flow - to make the protocol less-chat'y this.nextCreditToFlow += credits; - if (this.nextCreditToFlow >= this.prefetchCount || this.nextCreditToFlow >= 100) { + if (this.shouldSendFlow()) { final int tempFlow = this.nextCreditToFlow; this.receiveLink.flow(tempFlow); this.nextCreditToFlow = 0; if (TRACE_LOGGER.isDebugEnabled()) { - TRACE_LOGGER.debug(String.format("clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]", - this.getClientId(), this.receivePath, this.receiveLink.getName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId())); + TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], receiverPath[%s], linkName[%s], updated-link-credit[%s], sentCredits[%s], ThreadId[%s]", + this.getClientId(), this.receivePath, this.getReceiveLinkName(), this.receiveLink.getCredit(), tempFlow, Thread.currentThread().getId())); } } } + private boolean shouldSendFlow() { + return (this.nextCreditToFlow > 0 && this.nextCreditToFlow >= (this.prefetchCount / 2)) + || (this.nextCreditToFlow >= 100); + } + private void scheduleLinkOpenTimeout(final TimeoutTracker timeout) { // timer to signal a timeout if exceeds the operationTimeout on MessagingFactory this.openTimer = timer.schedule( @@ -815,7 +841,7 @@ public void onEvent() { receiveWork.onEvent(); if (!MessageReceiver.this.getIsClosingOrClosed() - && (receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) { + && (receiveLink == null || receiveLink.getLocalState() == EndpointState.CLOSED || receiveLink.getRemoteState() == EndpointState.CLOSED)) { createReceiveLink(); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java index 1481baa7e6f6..248643813aab 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessageSender.java @@ -131,7 +131,7 @@ public void onComplete(Void result) { if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - token renewed", - getClientId(), sendPath, sendLink.getName())); + getClientId(), sendPath, getSendLinkName())); } } @@ -140,7 +140,7 @@ public void onError(Exception error) { if (TRACE_LOGGER.isInfoEnabled()) { TRACE_LOGGER.info(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - tokenRenewalFailure[%s]", - getClientId(), sendPath, sendLink.getName(), error.getMessage())); + getClientId(), sendPath, getSendLinkName(), error.getMessage())); } } }); @@ -148,7 +148,7 @@ public void onError(Exception error) { if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s] - tokenRenewalScheduleFailure[%s]", - getClientId(), sendPath, sendLink.getName(), exception.getMessage())); + getClientId(), sendPath, getSendLinkName(), exception.getMessage())); } } } @@ -266,6 +266,10 @@ private CompletableFuture send( return this.sendCore(bytes, arrayOffset, messageFormat, onSend, tracker, null, null); } + private String getSendLinkName() { + return this.sendLink == null ? "null" : this.sendLink.getName(); + } + public CompletableFuture send(final Iterable messages) { if (messages == null || IteratorUtil.sizeEquals(messages, 0)) { throw new IllegalArgumentException(String.format(Locale.US, @@ -359,8 +363,8 @@ public void onOpenComplete(Exception completionException) { this.cancelOpenTimer(); if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onOpenComplete - clientId[%s], sendPath[%s], linkName[%s]", - this.getClientId(), this.sendPath, this.sendLink.getName())); + TRACE_LOGGER.info(String.format(Locale.US, "onOpenComplete - clientId[%s], sendPath[%s], linkName[%s]", + this.getClientId(), this.sendPath, this.getSendLinkName())); } if (!this.linkFirstOpen.isDone()) { @@ -508,7 +512,7 @@ public void onError(final Exception completionException) { @Override public void onEvent() { if (!MessageSender.this.getIsClosingOrClosed() - && (sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)) { + && (sendLink == null || sendLink.getLocalState() == EndpointState.CLOSED || sendLink.getRemoteState() == EndpointState.CLOSED)) { recreateSendLink(); } } @@ -543,7 +547,7 @@ public void onSendComplete(final Delivery delivery) { String.format( Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s]", - this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag)); + this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag)); } final ReplayableWorkItem pendingSendWorkItem = this.pendingSendsData.remove(deliveryTag); @@ -612,7 +616,7 @@ public void onEvent() { if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug( String.format(Locale.US, "clientId[%s]. path[%s], linkName[%s], delivery[%s] - mismatch (or send timed out)", - this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag)); + this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag)); } } } @@ -627,6 +631,13 @@ private void cleanupFailedSend(final ReplayableWorkItem failedSend, final private void createSendLink() { synchronized (this.errorConditionLock) { if (this.creatingLink) { + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info( + String.format(Locale.US, + "clientId[%s], path[%s], operationTimeout[%s], creating a send link is already in progress", + this.getClientId(), this.sendPath, this.operationTimeout)); + } + return; } @@ -663,7 +674,7 @@ public void accept(Session session) { sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); - final SendLinkHandler handler = new SendLinkHandler(MessageSender.this); + final SendLinkHandler handler = new SendLinkHandler(MessageSender.this, MessageSender.this.getClientId()); BaseHandler.setHandler(sender, handler); if (MessageSender.this.sendLink != null) { @@ -803,7 +814,7 @@ public void onFlow(final int creditIssued) { int numberOfSendsWaitingforCredit = this.pendingSends.size(); TRACE_LOGGER.debug(String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], remoteLinkCredit[%s], pendingSendsWaitingForCredit[%s], pendingSendsWaitingDelivery[%s]", - this.getClientId(), this.sendPath, this.sendLink.getName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit)); + this.getClientId(), this.sendPath, this.getSendLinkName(), creditIssued, numberOfSendsWaitingforCredit, this.pendingSendsData.size() - numberOfSendsWaitingforCredit)); } this.sendWork.onEvent(); @@ -816,10 +827,11 @@ private void recreateSendLink() { // actual send on the SenderLink should happen only in this method & should run on Reactor Thread private void processSendWork() { - if (this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) { + if (this.sendLink == null || this.sendLink.getLocalState() == EndpointState.CLOSED || this.sendLink.getRemoteState() == EndpointState.CLOSED) { if (!this.getIsClosingOrClosed()) { this.recreateSendLink(); } + return; } @@ -870,7 +882,7 @@ private void processSendWork() { if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s], sentMessageSize[%s], payloadActualSize[%s] - sendlink advance failed", - this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize())); + this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag, sentMsgSize, sendData.getEncodedMessageSize())); } if (delivery != null) { @@ -888,7 +900,7 @@ private void processSendWork() { if (TRACE_LOGGER.isDebugEnabled()) { TRACE_LOGGER.debug( String.format(Locale.US, "clientId[%s], path[%s], linkName[%s], deliveryTag[%s] - sendData not found for this delivery.", - this.getClientId(), this.sendPath, this.sendLink.getName(), deliveryTag)); + this.getClientId(), this.sendPath, this.getSendLinkName(), deliveryTag)); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java index 8336b3b1afa4..2509288948c0 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/MessagingFactory.java @@ -10,6 +10,7 @@ import com.microsoft.azure.eventhubs.OperationCancelledException; import com.microsoft.azure.eventhubs.RetryPolicy; import com.microsoft.azure.eventhubs.TimeoutException; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Connection; @@ -72,7 +73,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti final RetryPolicy retryPolicy, final ScheduledExecutorService executor, final ReactorFactory reactorFactory) { - super("MessagingFactory".concat(StringUtil.getRandomString()), null, executor); + super(StringUtil.getRandomString("MF"), null, executor); this.hostName = builder.getEndpoint().getHost(); this.reactorFactory = reactorFactory; @@ -80,7 +81,7 @@ public final class MessagingFactory extends ClientEntity implements AmqpConnecti this.retryPolicy = retryPolicy; this.registeredLinks = new LinkedList<>(); this.reactorLock = new Object(); - this.connectionHandler = ConnectionHandler.create(builder.getTransportType(), this); + this.connectionHandler = ConnectionHandler.create(builder.getTransportType(), this, this.getClientId()); this.cbsChannelCreateLock = new Object(); this.mgmtChannelCreateLock = new Object(); this.tokenProvider = builder.getSharedAccessSignature() == null @@ -168,7 +169,7 @@ private void createConnection() throws IOException { } private void startReactor(final ReactorHandler reactorHandler) throws IOException { - final Reactor newReactor = this.reactorFactory.create(reactorHandler, this.connectionHandler.getMaxFrameSize()); + final Reactor newReactor = this.reactorFactory.create(reactorHandler, this.connectionHandler.getMaxFrameSize(), this.getClientId()); synchronized (this.reactorLock) { this.reactor = newReactor; this.reactorDispatcher = new ReactorDispatcher(newReactor); @@ -183,7 +184,7 @@ private void startReactor(final ReactorHandler reactorHandler) throws IOExceptio public CBSChannel getCBSChannel() { synchronized (this.cbsChannelCreateLock) { if (this.cbsChannel == null) { - this.cbsChannel = new CBSChannel(this, this); + this.cbsChannel = new CBSChannel(this, this, this.getClientId()); } } @@ -193,7 +194,7 @@ public CBSChannel getCBSChannel() { public ManagementChannel getManagementChannel() { synchronized (this.mgmtChannelCreateLock) { if (this.mgmtChannel == null) { - this.mgmtChannel = new ManagementChannel(this, this); + this.mgmtChannel = new ManagementChannel(this, this, this.getClientId()); } } @@ -203,11 +204,16 @@ public ManagementChannel getManagementChannel() { @Override public Session getSession(final String path, final Consumer onRemoteSessionOpen, final BiConsumer onRemoteSessionOpenError) { if (this.getIsClosingOrClosed()) { - onRemoteSessionOpenError.accept(null, new OperationCancelledException("underlying messagingFactory instance is closed")); return null; } + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info( + String.format(Locale.US, "messagingFactory[%s], hostName[%s], getting a session.", + getClientId(), getHostName())); + } + if (this.connection == null || this.connection.getLocalState() == EndpointState.CLOSED || this.connection.getRemoteState() == EndpointState.CLOSED) { this.connection = this.getReactor().connectionToHost( this.connectionHandler.getRemoteHostName(), @@ -216,7 +222,7 @@ public Session getSession(final String path, final Consumer onRemoteSes } final Session session = this.connection.session(); - BaseHandler.setHandler(session, new SessionHandler(path, onRemoteSessionOpen, onRemoteSessionOpenError, this.operationTimeout)); + BaseHandler.setHandler(session, new SessionHandler(path, onRemoteSessionOpen, onRemoteSessionOpenError, this.operationTimeout, this.getClientId())); session.open(); return session; @@ -251,7 +257,7 @@ public void onOpenComplete(Exception exception) { @Override public void onConnectionError(ErrorCondition error) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], error[%s]", + TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], error[%s]", this.getClientId(), this.hostName, error != null ? error.getDescription() : "n/a")); @@ -259,7 +265,7 @@ public void onConnectionError(ErrorCondition error) { if (!this.open.isDone()) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], open hasn't complete, stopping the reactor", + TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], open hasn't complete, stopping the reactor", this.getClientId(), this.hostName)); } @@ -272,9 +278,9 @@ public void onConnectionError(ErrorCondition error) { final List closedLinks = new LinkedList<>(); for (Link link : oldRegisteredLinksCopy) { - if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) { + if (link.getLocalState() != EndpointState.CLOSED) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], closing link [%s]", + TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], closing link [%s]", this.getClientId(), this.hostName, link.getName())); } @@ -289,7 +295,7 @@ public void onConnectionError(ErrorCondition error) { // in connection recreation we depend on currentConnection state to evaluate need for recreation if (oldConnection.getLocalState() != EndpointState.CLOSED) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError: messagingFactory[%s], hostname[%s], closing current connection", + TRACE_LOGGER.warn(String.format(Locale.US, "onConnectionError messagingFactory[%s], hostname[%s], closing current connection", this.getClientId(), this.hostName)); } @@ -303,7 +309,7 @@ public void onConnectionError(ErrorCondition error) { for (Link link : closedLinks) { final Handler handler = BaseHandler.getHandler(link); - if (handler != null && handler instanceof BaseLinkHandler) { + if (handler instanceof BaseLinkHandler) { final BaseLinkHandler linkHandler = (BaseLinkHandler) handler; linkHandler.processOnClose(link, error); } @@ -349,17 +355,26 @@ private void onReactorError(Exception cause) { // below .close() calls (local closes). // But, we still need to change the states of these to Closed - so that subsequent retries - will // treat the links and connection as closed and re-establish them and continue running on new Reactor instance. - if (oldConnection.getLocalState() != EndpointState.CLOSED && oldConnection.getRemoteState() != EndpointState.CLOSED) { + ErrorCondition errorCondition = new ErrorCondition(Symbol.getSymbol("messagingfactory.onreactorerror"), cause.getMessage()); + if (oldConnection.getLocalState() != EndpointState.CLOSED) { + if (TRACE_LOGGER.isWarnEnabled()) { + TRACE_LOGGER.warn(String.format(Locale.US, "onReactorError: messagingFactory[%s], hostname[%s], closing current connection", + this.getClientId(), + this.hostName)); + } + + oldConnection.setCondition(errorCondition); oldConnection.close(); } for (final Link link : oldRegisteredLinksCopy) { - if (link.getLocalState() != EndpointState.CLOSED && link.getRemoteState() != EndpointState.CLOSED) { + if (link.getLocalState() != EndpointState.CLOSED) { + link.setCondition(errorCondition); link.close(); } final Handler handler = BaseHandler.getHandler(link); - if (handler != null && handler instanceof BaseLinkHandler) { + if (handler instanceof BaseLinkHandler) { final BaseLinkHandler linkHandler = (BaseLinkHandler) handler; linkHandler.processOnClose(link, cause); } @@ -415,8 +430,8 @@ public void scheduleOnReactorThread(final int delay, final DispatchHandler handl public static class ReactorFactory { - public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize) throws IOException { - return ProtonUtil.reactor(reactorHandler, maxFrameSize); + public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize, final String name) throws IOException { + return ProtonUtil.reactor(reactorHandler, maxFrameSize, name); } } @@ -611,6 +626,10 @@ public void run() { } private class ReactorHandlerWithConnection extends ReactorHandler { + ReactorHandlerWithConnection() { + super(getClientId()); + } + @Override public void onReactorInit(Event e) { super.onReactorInit(e); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java index e91230ad378c..92e1eb40e0a5 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionReceiverImpl.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Date; +import java.util.Locale; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -55,7 +56,7 @@ private PartitionReceiverImpl(MessagingFactory factory, final boolean isEpochReceiver, final ReceiverOptions receiverOptions, final ScheduledExecutorService executor) { - super("PartitionReceiverImpl".concat(StringUtil.getRandomString()), null, executor); + super(StringUtil.getRandomString("PR").concat(StringUtil.SEPARATOR + factory.getClientId()), null, executor); this.underlyingFactory = factory; this.eventHubName = eventHubName; @@ -104,7 +105,7 @@ public PartitionReceiver apply(Void a) { private CompletableFuture createInternalReceiver() { return MessageReceiver.create(this.underlyingFactory, this.getClientId().concat("-InternalReceiver"), - String.format("%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), + String.format(Locale.US, "%s/ConsumerGroups/%s/Partitions/%s", this.eventHubName, this.consumerGroupName, this.partitionId), this.receiverOptions.getPrefetchCount(), this) .thenAcceptAsync(new Consumer() { public void accept(MessageReceiver r) { @@ -248,7 +249,7 @@ public Map getFilter(final Message lastReceivedMes } else { logReceivePath = "receiverPath[" + this.internalReceiver.getReceivePath() + "]"; } - TRACE_LOGGER.info(String.format("%s, action[createReceiveLink], %s", logReceivePath, this.eventPosition)); + TRACE_LOGGER.info(String.format(Locale.US, "%s, action[createReceiveLink], %s", logReceivePath, this.eventPosition)); } return Collections.singletonMap(AmqpConstants.STRING_FILTER, new UnknownDescribedType(AmqpConstants.STRING_FILTER, expression)); diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java index bbde282f1f4e..86bf875d7728 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/PartitionSenderImpl.java @@ -9,6 +9,7 @@ import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.PartitionSender; +import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; @@ -22,7 +23,7 @@ final class PartitionSenderImpl extends ClientEntity implements PartitionSender private volatile MessageSender internalSender; private PartitionSenderImpl(final MessagingFactory factory, final String eventHubName, final String partitionId, final ScheduledExecutorService executor) { - super("PartitionSenderImpl".concat(StringUtil.getRandomString()), null, executor); + super(StringUtil.getRandomString("PS").concat(StringUtil.SEPARATOR + factory.getClientId()), null, executor); this.partitionId = partitionId; this.eventHubName = eventHubName; @@ -44,7 +45,7 @@ public PartitionSender apply(Void a) { private CompletableFuture createInternalSender() throws EventHubException { return MessageSender.create(this.factory, this.getClientId().concat("-InternalSender"), - String.format("%s/Partitions/%s", this.eventHubName, this.partitionId)) + String.format(Locale.US, "%s/Partitions/%s", this.eventHubName, this.partitionId)) .thenAcceptAsync(new Consumer() { public void accept(MessageSender a) { PartitionSenderImpl.this.internalSender = a; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ProtonUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ProtonUtil.java index caa69d739358..b375f4afe7dc 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ProtonUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ProtonUtil.java @@ -14,14 +14,14 @@ public final class ProtonUtil { private ProtonUtil() { } - public static Reactor reactor(final ReactorHandler reactorHandler, final int maxFrameSize) throws IOException { + public static Reactor reactor(final ReactorHandler reactorHandler, final int maxFrameSize, final String name) throws IOException { final ReactorOptions reactorOptions = new ReactorOptions(); reactorOptions.setMaxFrameSize(maxFrameSize); reactorOptions.setEnableSaslByDefault(true); final Reactor reactor = Proton.reactor(reactorOptions, reactorHandler); - reactor.setGlobalHandler(new CustomIOHandler()); + reactor.setGlobalHandler(new CustomIOHandler(name)); return reactor; } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorHandler.java index 6e8a8ce7657b..c8a5670e41f3 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReactorHandler.java @@ -9,12 +9,20 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; + public class ReactorHandler extends BaseHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ReactorHandler.class); + private final String name; + private ReactorDispatcher reactorDispatcher; + public ReactorHandler(final String name) { + this.name = name; + } + public ReactorDispatcher getReactorDispatcher() { return this.reactorDispatcher; } @@ -26,8 +34,7 @@ public void unsafeSetReactorDispatcher(final ReactorDispatcher reactorDispatcher @Override public void onReactorInit(Event e) { - - TRACE_LOGGER.info("reactor.onReactorInit"); + TRACE_LOGGER.info(String.format(Locale.US, "name[%s] reactor.onReactorInit", this.name)); final Reactor reactor = e.getReactor(); reactor.setTimeout(ClientConstants.REACTOR_IO_POLL_TIMEOUT); @@ -35,7 +42,6 @@ public void onReactorInit(Event e) { @Override public void onReactorFinal(Event e) { - - TRACE_LOGGER.info("reactor.onReactorFinal"); + TRACE_LOGGER.info(String.format(Locale.US, "name[%s] reactor.onReactorFinal", this.name)); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java index e3d56bdd09e5..bc2ed56d53a0 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceiveLinkHandler.java @@ -12,18 +12,18 @@ import java.util.Locale; -// ServiceBus <-> ProtonReactor interaction -// handles all recvLink - reactor events public final class ReceiveLinkHandler extends BaseLinkHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ReceiveLinkHandler.class); private final AmqpReceiver amqpReceiver; + private final String receiverName; private final Object firstResponse; private boolean isFirstResponse; - public ReceiveLinkHandler(final AmqpReceiver receiver) { - super(receiver); + public ReceiveLinkHandler(final AmqpReceiver receiver, final String receiverName) { + super(receiver, receiverName); this.amqpReceiver = receiver; + this.receiverName = receiverName; this.firstResponse = new Object(); this.isFirstResponse = true; } @@ -32,11 +32,9 @@ public ReceiveLinkHandler(final AmqpReceiver receiver) { public void onLinkLocalOpen(Event evt) { Link link = evt.getLink(); if (link instanceof Receiver) { - Receiver receiver = (Receiver) link; - if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info( - String.format("onLinkLocalOpen linkName[%s], localSource[%s]", receiver.getName(), receiver.getSource())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkLocalOpen receiverName[%s], linkName[%s], localSource[%s]", + this.receiverName, link.getName(), link.getSource())); } } } @@ -45,11 +43,10 @@ public void onLinkLocalOpen(Event evt) { public void onLinkRemoteOpen(Event event) { Link link = event.getLink(); if (link instanceof Receiver) { - Receiver receiver = (Receiver) link; if (link.getRemoteSource() != null) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteSource[%s]", - receiver.getName(), link.getRemoteSource())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen receiverName[%s], linkName[%s], remoteSource[%s]", + this.receiverName, link.getName(), link.getRemoteSource())); } synchronized (this.firstResponse) { @@ -58,9 +55,8 @@ public void onLinkRemoteOpen(Event event) { } } else { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info( - String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[null], " - + "remoteSource[null], action[waitingForError]", receiver.getName())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen receiverName[%s], linkName[%s], action[waitingForError]", + this.receiverName, link.getName())); } } } @@ -90,9 +86,9 @@ public void onDelivery(Event event) { if (TRACE_LOGGER.isWarnEnabled()) { TRACE_LOGGER.warn( receiveLink != null - ? String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + ? String.format(Locale.US, "onDelivery receiverName[%s], linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + "remoteCondition[%s], delivery.isSettled[%s]", - receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isSettled()) + this.receiverName, receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isSettled()) : String.format(Locale.US, "delivery.isSettled[%s]", delivery.isSettled())); } } else { @@ -102,9 +98,9 @@ public void onDelivery(Event event) { if (TRACE_LOGGER.isTraceEnabled() && receiveLink != null) { TRACE_LOGGER.trace( - String.format(Locale.US, "onDelivery linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " - + "remoteCondition[%s], delivery.isPartial[%s]", - receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isPartial())); + String.format(Locale.US, "onDelivery receiverName[%s], linkName[%s], updatedLinkCredit[%s], remoteCredit[%s], " + + "remoteCondition[%s], delivery.isPartial[%s]", + this.receiverName, receiveLink.getName(), receiveLink.getCredit(), receiveLink.getRemoteCredit(), receiveLink.getRemoteCondition(), delivery.isPartial())); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java index 4430ce551295..b4c8f5d98088 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/ReceivePump.java @@ -8,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Locale; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; @@ -55,8 +56,8 @@ public void run() { } catch (final Exception exception) { if (TRACE_LOGGER.isErrorEnabled()) { TRACE_LOGGER.error( - String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " - + "encountered unrecoverable error and exited with exception %s.", + String.format(Locale.US, "Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + + "encountered unrecoverable error and exited with exception %s.", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), exception.toString())); } @@ -71,7 +72,7 @@ public void receiveAndProcess() { .handleAsync(this.processAndReschedule, this.executor); } else { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("Stopping receive pump for eventHub (%s), consumerGroup (%s), partition (%s) as %s", + TRACE_LOGGER.info(String.format(Locale.US, "Stopping receive pump for eventHub (%s), consumerGroup (%s), partition (%s) as %s", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), this.stopPumpRaised.get() ? "per the request." : "pump ran into errors.")); } @@ -111,7 +112,7 @@ private void handleUserCodeExceptions(final Throwable userCodeException) { this.isPumpHealthy = false; if (TRACE_LOGGER.isErrorEnabled()) { TRACE_LOGGER.error( - String.format("Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + String.format(Locale.US, "Receive pump for eventHub (%s), consumerGroup (%s), partition (%s) " + "exiting after user-code exception %s", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId(), userCodeException.toString())); } @@ -120,7 +121,7 @@ private void handleUserCodeExceptions(final Throwable userCodeException) { if (userCodeException instanceof InterruptedException) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("Interrupting receive pump for eventHub (%s), consumerGroup (%s), partition (%s)", + TRACE_LOGGER.info(String.format(Locale.US, "Interrupting receive pump for eventHub (%s), consumerGroup (%s), partition (%s)", this.eventHubName, this.consumerGroupName, this.receiver.getPartitionId())); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java index e19fa8e6f60b..fd3191acae16 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseChannel.java @@ -57,7 +57,7 @@ public RequestResponseChannel( this.sendLink.setTarget(target); sendLink.setSource(new Source()); this.sendLink.setSenderSettleMode(SenderSettleMode.SETTLED); - BaseHandler.setHandler(this.sendLink, new SendLinkHandler(new RequestHandler())); + BaseHandler.setHandler(this.sendLink, new SendLinkHandler(new RequestHandler(), linkName)); this.receiveLink = session.receiver(linkName + ":receiver"); final Source source = new Source(); @@ -68,7 +68,7 @@ public RequestResponseChannel( this.receiveLink.setTarget(receiverTarget); this.receiveLink.setSenderSettleMode(SenderSettleMode.SETTLED); this.receiveLink.setReceiverSettleMode(ReceiverSettleMode.SECOND); - BaseHandler.setHandler(this.receiveLink, new ReceiveLinkHandler(new ResponseHandler())); + BaseHandler.setHandler(this.receiveLink, new ReceiveLinkHandler(new ResponseHandler(), linkName)); } // open should be called only once - we use FaultTolerantObject for that diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseCloser.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseCloser.java index 99d56495fccf..1403a700cbb2 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseCloser.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseCloser.java @@ -20,7 +20,9 @@ public void run(OperationResult closeOperationCallback) { if (channelToBeClosed == null) { closeOperationCallback.onComplete(null); } else { - channelToBeClosed.close(new OperationResultBase<>(closeOperationCallback::onComplete, closeOperationCallback::onError)); + channelToBeClosed.close(new OperationResultBase<>( + closeOperationCallback::onComplete, + closeOperationCallback::onError)); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java index a6799464fb0e..a7e5d22fcd15 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/RequestResponseOpener.java @@ -4,17 +4,27 @@ package com.microsoft.azure.eventhubs.impl; import org.apache.qpid.proton.engine.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Locale; public class RequestResponseOpener implements Operation { + private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(RequestResponseOpener.class); + private final SessionProvider sessionProvider; + private final String clientId; private final String sessionName; private final String linkName; private final String endpointAddress; private final AmqpConnection eventDispatcher; - public RequestResponseOpener(final SessionProvider sessionProvider, final String sessionName, final String linkName, + private boolean isOpened; + + public RequestResponseOpener(final SessionProvider sessionProvider, final String clientId, final String sessionName, final String linkName, final String endpointAddress, final AmqpConnection eventDispatcher) { this.sessionProvider = sessionProvider; + this.clientId = clientId; this.sessionName = sessionName; this.linkName = linkName; this.endpointAddress = endpointAddress; @@ -22,7 +32,10 @@ public RequestResponseOpener(final SessionProvider sessionProvider, final String } @Override - public void run(OperationResult operationCallback) { + public synchronized void run(OperationResult operationCallback) { + if (this.isOpened) { + return; + } final Session session = this.sessionProvider.getSession( this.sessionName, @@ -51,11 +64,23 @@ public void onComplete(Void result) { eventDispatcher.registerForConnectionError(requestResponseChannel.getReceiveLink()); operationCallback.onComplete(requestResponseChannel); + + isOpened = true; + + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onOpen complete clientId[%s], session[%s], link[%s], endpoint[%s]", + clientId, sessionName, linkName, endpointAddress)); + } } @Override public void onError(Exception error) { operationCallback.onError(error); + + if (TRACE_LOGGER.isWarnEnabled()) { + TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onOpen error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", + clientId, sessionName, linkName, endpointAddress, error)); + } } }, new OperationResult() { @@ -63,12 +88,24 @@ public void onError(Exception error) { public void onComplete(Void result) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); + + isOpened = false; + + if (TRACE_LOGGER.isInfoEnabled()) { + TRACE_LOGGER.info(String.format(Locale.US, "requestResponseChannel.onClose complete clientId[%s], session[%s], link[%s], endpoint[%s]", + clientId, sessionName, linkName, endpointAddress)); + } } @Override public void onError(Exception error) { eventDispatcher.deregisterForConnectionError(requestResponseChannel.getSendLink()); eventDispatcher.deregisterForConnectionError(requestResponseChannel.getReceiveLink()); + + if (TRACE_LOGGER.isWarnEnabled()) { + TRACE_LOGGER.warn(String.format(Locale.US, "requestResponseChannel.onClose error clientId[%s], session[%s], link[%s], endpoint[%s], error %s", + clientId, sessionName, linkName, endpointAddress, error)); + } } }); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java index f2220ef1d7df..55949b61306b 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SendLinkHandler.java @@ -10,20 +10,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.charset.StandardCharsets; import java.util.Locale; import java.util.concurrent.atomic.AtomicBoolean; -import static java.nio.charset.StandardCharsets.UTF_8; - public class SendLinkHandler extends BaseLinkHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(SendLinkHandler.class); private final AmqpSender msgSender; + private final String senderName; private AtomicBoolean isFirstFlow; - public SendLinkHandler(final AmqpSender sender) { - super(sender); + public SendLinkHandler(final AmqpSender sender, final String senderName) { + super(sender, senderName); this.msgSender = sender; + this.senderName = senderName; this.isFirstFlow = new AtomicBoolean(true); } @@ -31,9 +32,9 @@ public SendLinkHandler(final AmqpSender sender) { public void onLinkLocalOpen(Event event) { Link link = event.getLink(); if (link instanceof Sender) { - Sender sender = (Sender) link; if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format("onLinkLocalOpen linkName[%s], localTarget[%s]", sender.getName(), sender.getTarget())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkLocalOpen senderName[%s], linkName[%s], localTarget[%s]", + this.senderName, link.getName(), link.getTarget())); } } } @@ -42,20 +43,19 @@ public void onLinkLocalOpen(Event event) { public void onLinkRemoteOpen(Event event) { Link link = event.getLink(); if (link instanceof Sender) { - Sender sender = (Sender) link; if (link.getRemoteTarget() != null) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[%s]", sender.getName(), link.getRemoteTarget())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen senderName[%s], linkName[%s], remoteTarget[%s]", + this.senderName, link.getName(), link.getRemoteTarget())); } if (this.isFirstFlow.compareAndSet(true, false)) { this.msgSender.onOpenComplete(null); } - } else { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info( - String.format(Locale.US, "onLinkRemoteOpen linkName[%s], remoteTarget[null], remoteSource[null], action[waitingForError]", sender.getName())); + TRACE_LOGGER.info(String.format(Locale.US, "onLinkRemoteOpen senderName[%s], linkName[%s], remoteTarget[null], remoteSource[null], action[waitingForError]", + this.senderName, link.getName())); } } } @@ -69,10 +69,8 @@ public void onDelivery(Event event) { Sender sender = (Sender) delivery.getLink(); if (TRACE_LOGGER.isTraceEnabled()) { - TRACE_LOGGER.trace( - "onDelivery linkName[" + sender.getName() - + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getRemoteCredit() + "], deliveryState[" + delivery.getRemoteState() - + "], delivery.isBuffered[" + delivery.isBuffered() + "], delivery.id[" + new String(delivery.getTag(), UTF_8) + "]"); + TRACE_LOGGER.trace(String.format(Locale.US, "onDelivery senderName[%s], linkName[%s], unsettled[%s], credit[%s], deliveryState[%s], delivery.isBuffered[%s], delivery.id[%s]", + this.senderName, sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(), delivery.getRemoteState(), delivery.isBuffered(), new String(delivery.getTag(), StandardCharsets.UTF_8))); } msgSender.onSendComplete(delivery); @@ -92,7 +90,8 @@ public void onLinkFlow(Event event) { this.msgSender.onFlow(sender.getRemoteCredit()); if (TRACE_LOGGER.isDebugEnabled()) { - TRACE_LOGGER.debug("onLinkFlow linkName[" + sender.getName() + "], unsettled[" + sender.getUnsettled() + "], credit[" + sender.getCredit() + "]"); + TRACE_LOGGER.debug(String.format(Locale.US, "onLinkFlow senderName[%s], linkName[%s], unsettled[%s], credit[%s]", + this.senderName, sender.getName(), sender.getUnsettled(), sender.getCredit())); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java index 15d29e9a962f..3818fcdacb7a 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/SessionHandler.java @@ -28,6 +28,7 @@ public class SessionHandler extends BaseHandler { private final Consumer onRemoteSessionOpen; private final BiConsumer onRemoteSessionOpenError; private final Duration openTimeout; + private final String connectionId; private boolean sessionCreated = false; private boolean sessionOpenErrorDispatched = false; @@ -35,18 +36,20 @@ public class SessionHandler extends BaseHandler { public SessionHandler(final String entityName, final Consumer onRemoteSessionOpen, final BiConsumer onRemoteSessionOpenError, - final Duration openTimeout) { + final Duration openTimeout, + final String connectionId) { this.entityName = entityName; this.onRemoteSessionOpenError = onRemoteSessionOpenError; this.onRemoteSessionOpen = onRemoteSessionOpen; this.openTimeout = openTimeout; + this.connectionId = connectionId; } @Override public void onSessionLocalOpen(Event e) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalOpen entityName[%s], condition[%s]", this.entityName, - e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString())); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalOpen connectionId[%s], entityName[%s], condition[%s]", + this.connectionId, this.entityName, e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString())); } if (this.onRemoteSessionOpenError != null) { @@ -66,8 +69,7 @@ public void onSessionLocalOpen(Event e) { null, new EventHubException( false, - String.format("OnSessionLocalOpen entityName[%s], reactorHandler: NULL POINTER exception.") - ) + String.format("OnSessionLocalOpen entityName[%s], reactorHandler: NULL POINTER exception.", this.entityName)) ); e.getSession().close(); return; @@ -77,11 +79,11 @@ public void onSessionLocalOpen(Event e) { final Session session = e.getSession(); try { - reactorDispatcher.invoke((int) this.openTimeout.toMillis(), new SessionTimeoutHandler(session, entityName)); + reactorDispatcher.invoke((int) this.openTimeout.toMillis(), new SessionTimeoutHandler(entityName, connectionId)); } catch (IOException ioException) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "onSessionLocalOpen entityName[%s], reactorDispatcherError[%s]", - this.entityName, ioException.getMessage())); + TRACE_LOGGER.warn(String.format(Locale.US, "onSessionLocalOpen connectionId[%s], entityName[%s], reactorDispatcherError[%s]", + this.connectionId, this.entityName, ioException.getMessage())); } session.close(); @@ -89,8 +91,8 @@ public void onSessionLocalOpen(Event e) { null, new EventHubException( false, - String.format("onSessionLocalOpen entityName[%s], underlying IO of reactorDispatcher faulted with error: %s", - this.entityName, ioException.getMessage()), ioException)); + String.format(Locale.US, "onSessionLocalOpen connectionId[%s], entityName[%s], underlying IO of reactorDispatcher faulted with error: %s", + this.connectionId, this.entityName, ioException.getMessage()), ioException)); } } } @@ -98,8 +100,8 @@ public void onSessionLocalOpen(Event e) { @Override public void onSessionRemoteOpen(Event e) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteOpen entityName[%s], sessionIncCapacity[%s], sessionOutgoingWindow[%s]", - this.entityName, e.getSession().getIncomingCapacity(), e.getSession().getOutgoingWindow())); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteOpen connectionId[%s], entityName[%s], sessionIncCapacity[%s], sessionOutgoingWindow[%s]", + this.connectionId, this.entityName, e.getSession().getIncomingCapacity(), e.getSession().getOutgoingWindow())); } final Session session = e.getSession(); @@ -116,16 +118,16 @@ public void onSessionRemoteOpen(Event e) { @Override public void onSessionLocalClose(Event e) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalClose entityName[%s], condition[%s]", this.entityName, - e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString())); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionLocalClose connectionId[%s], entityName[%s], condition[%s]", this.entityName, + this.connectionId, e.getSession().getCondition() == null ? "none" : e.getSession().getCondition().toString())); } } @Override public void onSessionRemoteClose(Event e) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteClose entityName[%s], condition[%s]", this.entityName, - e.getSession().getRemoteCondition() == null ? "none" : e.getSession().getRemoteCondition().toString())); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteClose connectionId[%s], entityName[%s], condition[%s]", this.entityName, + this.connectionId, e.getSession().getRemoteCondition() == null ? "none" : e.getSession().getRemoteCondition().toString())); } final Session session = e.getSession(); @@ -133,10 +135,8 @@ public void onSessionRemoteClose(Event e) { if (session != null && session.getLocalState() != EndpointState.CLOSED) { if (TRACE_LOGGER.isInfoEnabled()) { - TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteClose closing a local session for entityName[%s], condition[%s], description[%s]", - this.entityName, - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionRemoteClose closing a local session for connectionId[%s], entityName[%s], condition[%s], description[%s]", + this.connectionId, this.entityName, condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } session.setCondition(session.getRemoteCondition()); @@ -155,19 +155,19 @@ public void onSessionFinal(Event e) { final Session session = e.getSession(); ErrorCondition condition = session != null ? session.getCondition() : null; - TRACE_LOGGER.info(String.format(Locale.US, "onSessionFinal entityName[%s], condition[%s], description[%s]", - this.entityName, - condition != null ? condition.getCondition() : "n/a", - condition != null ? condition.getDescription() : "n/a")); + TRACE_LOGGER.info(String.format(Locale.US, "onSessionFinal connectionId[%s], entityName[%s], condition[%s], description[%s]", + this.connectionId, this.entityName, condition != null ? condition.getCondition() : "n/a", condition != null ? condition.getDescription() : "n/a")); } } private class SessionTimeoutHandler extends DispatchHandler { private final String entityName; + private final String connectionId; - SessionTimeoutHandler(final Session session, final String entityName) { + SessionTimeoutHandler(final String entityName, final String connectionId) { this.entityName = entityName; + this.connectionId = connectionId; } @Override @@ -181,8 +181,8 @@ public void onEvent() { if (!sessionCreated && !sessionOpenErrorDispatched) { if (TRACE_LOGGER.isWarnEnabled()) { - TRACE_LOGGER.warn(String.format(Locale.US, "SessionTimeoutHandler.onEvent - entityName[%s], session open timed out.", - this.entityName)); + TRACE_LOGGER.warn(String.format(Locale.US, "SessionTimeoutHandler.onEvent - connectionId[%s], entityName[%s], session open timed out.", + this.connectionId, this.entityName)); } } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java index 6660d8d4289f..d55792e667ee 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/StringUtil.java @@ -3,10 +3,13 @@ package com.microsoft.azure.eventhubs.impl; +import java.time.Instant; +import java.util.Locale; import java.util.UUID; public final class StringUtil { public static final String EMPTY = ""; + public static final String SEPARATOR = "_"; public static boolean isNullOrEmpty(String string) { return (string == null || string.isEmpty()); @@ -25,7 +28,7 @@ public static boolean isNullOrWhiteSpace(String string) { return true; } - public static String getRandomString() { - return UUID.randomUUID().toString().substring(0, 6); + public static String getRandomString(String prefix) { + return String.format(Locale.US, "%s_%s_%s", prefix, UUID.randomUUID().toString().substring(0, 6), Instant.now().toEpochMilli()); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java index 8a52e6bc2883..eb63d0b4af8d 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TimeoutTracker.java @@ -3,7 +3,6 @@ package com.microsoft.azure.eventhubs.impl; - import java.time.Duration; import java.time.Instant; diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java index 74cde83b1ff4..fe40e2eb68a8 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/TrackingUtil.java @@ -5,34 +5,18 @@ import org.apache.qpid.proton.engine.Session; -import java.time.Instant; - public final class TrackingUtil { public static final String TRACKING_ID_TOKEN_SEPARATOR = "_"; private TrackingUtil() { } - /** - * parses ServiceBus role identifiers from trackingId - * - * @return null if no roleIdentifier found - */ - static String parseRoleIdentifier(final String trackingId) { - if (StringUtil.isNullOrWhiteSpace(trackingId) || !trackingId.contains(TRACKING_ID_TOKEN_SEPARATOR)) { - return null; - } - - return trackingId.substring(trackingId.indexOf(TRACKING_ID_TOKEN_SEPARATOR)); - } - public static String getLinkName(final Session session) { - // returned linkName lookslike: ea9cac_8b_G27_1479943074829 - final String linkNamePrefix = StringUtil.getRandomString(); - final String linkNameWithServiceRoleTracker = session.getConnection() != null && !StringUtil.isNullOrEmpty(session.getConnection().getRemoteContainer()) + // LN_1479943074829_ea9cac_8b_G27 + final String linkNamePrefix = StringUtil.getRandomString("LN"); + return session.getConnection() != null && !StringUtil.isNullOrEmpty(session.getConnection().getRemoteContainer()) ? linkNamePrefix.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(session.getConnection().getRemoteContainer() - .substring(Math.max(session.getConnection().getRemoteContainer().length() - 7, 0), session.getConnection().getRemoteContainer().length())) + .substring(Math.max(session.getConnection().getRemoteContainer().length() - 7, 0))) : linkNamePrefix; - return linkNameWithServiceRoleTracker.concat(TrackingUtil.TRACKING_ID_TOKEN_SEPARATOR).concat(String.valueOf(Instant.now().toEpochMilli())); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java index 5c97f42b71b2..51ccec9b5cd7 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketConnectionHandler.java @@ -13,7 +13,7 @@ public class WebSocketConnectionHandler extends ConnectionHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class); public WebSocketConnectionHandler(AmqpConnection amqpConnection) { - super(amqpConnection); + super(amqpConnection, StringUtil.getRandomString("WS")); } @Override diff --git a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java index 7b1a172895ff..e7d512bcd3a3 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java +++ b/eventhubs/data-plane/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/impl/WebSocketProxyConnectionHandler.java @@ -6,7 +6,6 @@ import com.microsoft.azure.proton.transport.proxy.ProxyHandler; import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl; import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl; - import org.apache.qpid.proton.amqp.transport.ConnectionError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; @@ -17,18 +16,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.Authenticator; import java.net.InetSocketAddress; -import java.net.PasswordAuthentication; import java.net.Proxy; import java.net.ProxySelector; import java.net.URI; -import java.util.Base64; -import java.util.HashMap; import java.util.List; -import java.util.Map; - -import static java.nio.charset.StandardCharsets.UTF_8; public class WebSocketProxyConnectionHandler extends WebSocketConnectionHandler { private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketProxyConnectionHandler.class); @@ -59,8 +51,7 @@ protected void addTransportLayers(final Event event, final TransportInternal tra // after creating the socket to proxy final String hostName = event.getConnection().getHostname(); final ProxyHandler proxyHandler = new ProxyHandlerImpl(); - final Map proxyHeader = getAuthorizationHeader(); - proxy.configure(hostName, proxyHeader, proxyHandler, transport); + proxy.configure(hostName, null, proxyHandler, transport); transport.addTransportLayer(proxy); @@ -120,38 +111,6 @@ public int getRemotePort() { return socketAddress.getPort(); } - private Map getAuthorizationHeader() { - final PasswordAuthentication authentication = Authenticator.requestPasswordAuthentication( - getRemoteHostName(), - null, - getRemotePort(), - null, - null, - "http", - null, - Authenticator.RequestorType.PROXY); - if (authentication == null) { - return null; - } - - final String proxyUserName = authentication.getUserName(); - final String proxyPassword = authentication.getPassword() != null - ? new String(authentication.getPassword()) - : null; - if (StringUtil.isNullOrEmpty(proxyUserName) - || StringUtil.isNullOrEmpty(proxyPassword)) { - return null; - } - - final HashMap proxyAuthorizationHeader = new HashMap<>(); - // https://tools.ietf.org/html/rfc7617 - final String usernamePasswordPair = proxyUserName + ":" + proxyPassword; - proxyAuthorizationHeader.put( - "Proxy-Authorization", - "Basic " + Base64.getEncoder().encodeToString(usernamePasswordPair.getBytes(UTF_8))); - return proxyAuthorizationHeader; - } - private InetSocketAddress getProxyAddress() { final URI serviceUri = createURIFromHostNamePort( this.getAmqpConnection().getHostName(), diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java index 8d048b08dfdf..ecb3278fafd2 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/MsgFactoryOpenCloseTest.java @@ -3,10 +3,10 @@ package com.microsoft.azure.eventhubs.exceptioncontracts; +import com.microsoft.azure.eventhubs.CommunicationException; import com.microsoft.azure.eventhubs.ConnectionStringBuilder; import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.EventHubClient; -import com.microsoft.azure.eventhubs.EventHubException; import com.microsoft.azure.eventhubs.EventPosition; import com.microsoft.azure.eventhubs.PartitionReceiveHandler; import com.microsoft.azure.eventhubs.PartitionReceiver; @@ -17,9 +17,9 @@ import com.microsoft.azure.eventhubs.lib.TestContext; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.Test; +import java.time.Duration; import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -115,7 +115,6 @@ public void onError(Throwable error) { } } - @Ignore("TODO: Investigate testcase failure.") @Test() public void verifyThreadReleaseOnMsgFactoryOpenError() throws Exception { @@ -133,12 +132,14 @@ public void verifyThreadReleaseOnMsgFactoryOpenError() throws Exception { openFuture.get(); Assert.fail(); } catch (ExecutionException error) { - Assert.assertEquals(EventHubException.class, error.getCause().getClass()); + Assert.assertEquals(CommunicationException.class, error.getCause().getClass()); } - Thread.sleep(1000); // for reactor to transition from cleanup to complete-stop + // Waiting for reactor to transition from cleanup to complete-stop, this requires at least 60 seconds until + // the items are emptied. + Thread.sleep(Duration.ofSeconds(90).toMillis()); - Assert.assertEquals(((ScheduledThreadPoolExecutor) executor).getQueue().size(), 0); + Assert.assertEquals(0, ((ScheduledThreadPoolExecutor) executor).getQueue().size()); } finally { executor.shutdown(); } diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReactorFaultTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReactorFaultTest.java index d8973a721bcf..f3cb525cb257 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReactorFaultTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/ReactorFaultTest.java @@ -19,6 +19,7 @@ import org.apache.qpid.proton.reactor.Reactor; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import java.lang.reflect.Field; @@ -34,6 +35,7 @@ public static void initialize() { connStr = TestContext.getConnectionString(); } + @Ignore("TODO: Investigate testcase. This fails.") @Test() public void verifyReactorRestartsOnProtonBugs() throws Exception { final EventHubClient eventHubClient = EventHubClient.createSync(connStr.toString(), TestContext.EXECUTOR_SERVICE); @@ -58,7 +60,7 @@ public void run() { handler.add(new BaseHandler() { @Override public void handle(org.apache.qpid.proton.engine.Event e) { - throw new NullPointerException(); + throw new NullPointerException("The test exception. We want this to restart."); } }); } catch (Exception e) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SecurityExceptionsTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SecurityExceptionsTest.java index 1416b65db58c..b070a110e0ad 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SecurityExceptionsTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SecurityExceptionsTest.java @@ -20,6 +20,7 @@ import org.junit.Test; import java.time.Duration; +import java.util.Locale; import java.util.UUID; public class SecurityExceptionsTest extends ApiTestBase { @@ -89,7 +90,7 @@ public void testEventHubClientUnAuthorizedAccessToken() throws Throwable { final String wrongToken = SharedAccessSignatureTokenProvider.generateSharedAccessSignature( "wrongkey", correctConnectionString.getSasKey(), - String.format("amqps://%s/%s", correctConnectionString.getEndpoint().getHost(), correctConnectionString.getEventHubName()), + String.format(Locale.US, "amqps://%s/%s", correctConnectionString.getEndpoint().getHost(), correctConnectionString.getEventHubName()), Duration.ofSeconds(10)); final ConnectionStringBuilder connectionString = new ConnectionStringBuilder() .setEndpoint(correctConnectionString.getEndpoint()) diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java index c7a26fd13b4a..dee6bcbc38ed 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/exceptioncontracts/SendLargeMessageTest.java @@ -19,6 +19,7 @@ import org.junit.Test; import java.time.Instant; +import java.util.Locale; public class SendLargeMessageTest extends ApiTestBase { private static final String PARTITION_ID = "0"; @@ -97,6 +98,6 @@ private void sendLargeMessageTest(int msgSize) throws EventHubException { EventData recdMessage = messages.iterator().next(); - Assert.assertEquals(String.format("sent msg size: %s, recvd msg size: %s", msgSize, recdMessage.getBytes().length), recdMessage.getBytes().length, msgSize); + Assert.assertEquals(String.format(Locale.US, "sent msg size: %s, recvd msg size: %s", msgSize, recdMessage.getBytes().length), recdMessage.getBytes().length, msgSize); } } diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/FaultInjectingReactorFactory.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/FaultInjectingReactorFactory.java index 54b8562f6700..29b01efa89b4 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/FaultInjectingReactorFactory.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/FaultInjectingReactorFactory.java @@ -24,7 +24,7 @@ public void setFaultType(final FaultType faultType) { } @Override - public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize) throws IOException { + public Reactor create(final ReactorHandler reactorHandler, final int maxFrameSize, final String name) throws IOException { final Reactor reactor = Proton.reactor(reactorHandler); switch (this.faultType) { @@ -44,6 +44,10 @@ public enum FaultType { public static final class NetworkOutageSimulator extends CustomIOHandler { + public NetworkOutageSimulator() { + super("NetworkOutageSimulator"); + } + @Override public void onUnhandled(final Event event) { switch (event.getType()) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/SasTokenTestBase.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/SasTokenTestBase.java index 8e66a8c5a2a5..9ba244fe6747 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/SasTokenTestBase.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/SasTokenTestBase.java @@ -9,6 +9,7 @@ import org.junit.BeforeClass; import java.time.Duration; +import java.util.Locale; public class SasTokenTestBase extends ApiTestBase { @@ -24,7 +25,7 @@ public static void replaceConnectionString() throws Exception { .setSharedAccessSignature( SharedAccessSignatureTokenProvider.generateSharedAccessSignature(originalConnectionString.getSasKeyName(), originalConnectionString.getSasKey(), - String.format("amqp://%s/%s", originalConnectionString.getEndpoint().getHost(), originalConnectionString.getEventHubName()), + String.format(Locale.US, "amqp://%s/%s", originalConnectionString.getEndpoint().getHost(), originalConnectionString.getEventHubName()), Duration.ofDays(1)) ) .toString(); diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestContext.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestContext.java index 3366bc614364..cacc7461ba4e 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestContext.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/lib/TestContext.java @@ -9,7 +9,6 @@ import java.util.concurrent.ScheduledExecutorService; public final class TestContext { - public static final ScheduledExecutorService EXECUTOR_SERVICE = Executors.newScheduledThreadPool(1); private static final String EVENT_HUB_CONNECTION_STRING_ENV_NAME = "AZURE_EVENTHUBS_CONNECTION_STRING"; diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveParallelManualTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveParallelManualTest.java index 73411d1d19c4..eb73673bf219 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveParallelManualTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveParallelManualTest.java @@ -16,6 +16,7 @@ import org.junit.AfterClass; import org.junit.BeforeClass; +import java.util.Locale; import java.util.concurrent.ExecutionException; import java.util.logging.FileHandler; import java.util.logging.Level; @@ -104,14 +105,14 @@ public void run() { long batchSize = (1 + IteratorUtil.getLast(receivedEvents.iterator()).getSystemProperties().getSequenceNumber()) - (IteratorUtil.getFirst(receivedEvents).getSystemProperties().getSequenceNumber()); totalEvents += batchSize; - System.out.println(String.format("[partitionId: %s] received %s events; total sofar: %s, begin: %s, end: %s", + System.out.println(String.format(Locale.US, "[partitionId: %s] received %s events; total sofar: %s, begin: %s, end: %s", sPartitionId, batchSize, totalEvents, IteratorUtil.getLast(receivedEvents.iterator()).getSystemProperties().getSequenceNumber(), IteratorUtil.getFirst(receivedEvents).getSystemProperties().getSequenceNumber())); } else { - System.out.println(String.format("received null on partition %s", sPartitionId)); + System.out.println(String.format(Locale.US, "received null on partition %s", sPartitionId)); } } catch (Exception exp) { System.out.println(exp.getMessage() + exp.toString()); diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java index 51cfe84113d2..ef25a601ca55 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiveTest.java @@ -23,6 +23,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Iterator; +import java.util.Locale; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -69,7 +70,7 @@ public void testReceiverStartOfStreamFilters() throws EventHubException { for (EventData eventDataUsingOffset : startingEventsUsingOffsetReceiver) { EventData eventDataUsingDateTime = dateTimeIterator.next(); Assert.assertTrue( - String.format("START_OF_STREAM offset: %s, EPOCH offset: %s", eventDataUsingOffset.getSystemProperties().getOffset(), eventDataUsingDateTime.getSystemProperties().getOffset()), + String.format(Locale.US, "START_OF_STREAM offset: %s, EPOCH offset: %s", eventDataUsingOffset.getSystemProperties().getOffset(), eventDataUsingDateTime.getSystemProperties().getOffset()), eventDataUsingOffset.getSystemProperties().getOffset().equalsIgnoreCase(eventDataUsingDateTime.getSystemProperties().getOffset())); if (!dateTimeIterator.hasNext()) { diff --git a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/SendTest.java b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/SendTest.java index 6f79da660c55..3006234578c9 100644 --- a/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/SendTest.java +++ b/eventhubs/data-plane/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/SendTest.java @@ -23,6 +23,7 @@ import java.time.Instant; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -43,6 +44,7 @@ public static void initialize() throws Exception { final ConnectionStringBuilder connectionString = TestContext.getConnectionString(); initializeEventHub(connectionString); } + public static void initializeEventHub(final ConnectionStringBuilder connectionString) throws Exception { ehClient = EventHubClient.createSync(connectionString.toString(), TestContext.EXECUTOR_SERVICE); } @@ -175,7 +177,7 @@ public void onReceive(Iterable events) { for (EventData event : events) { if (!partitionKey.equals(event.getSystemProperties().getPartitionKey())) { this.validateSignal.completeExceptionally( - new AssertionFailedError(String.format("received partitionKey: %s, expected partitionKey: %s", event.getSystemProperties().getPartitionKey(), partitionKey))); + new AssertionFailedError(String.format(Locale.US, "received partitionKey: %s, expected partitionKey: %s", event.getSystemProperties().getPartitionKey(), partitionKey))); } this.currentEventCount++; @@ -215,9 +217,8 @@ public void onReceive(Iterable events) { for (EventData event : events) { final int currentEventOrder = (int) event.getProperties().get(ORDER_PROPERTY); if (currentEventOrder != currentCount) { - this.validateSignal.completeExceptionally(new AssertionError(String.format("expected %s, got %s", currentCount, currentEventOrder))); + this.validateSignal.completeExceptionally(new AssertionError(String.format(Locale.US, "expected %s, got %s", currentCount, currentEventOrder))); } - currentCount++; } } diff --git a/eventhubs/data-plane/pom.xml b/eventhubs/data-plane/pom.xml index e82e44df223e..93d1e09126b5 100644 --- a/eventhubs/data-plane/pom.xml +++ b/eventhubs/data-plane/pom.xml @@ -4,23 +4,23 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + com.azure + azure-client-sdk-parent + 1.0.0 + ../../pom.client.xml + + 4.0.0 com.microsoft.azure azure-eventhubs-clients pom - 2.0.0 + 2.3.0 Microsoft Azure Event Hubs SDK Parent Java libraries for talking to Windows Azure Event Hubs https://github.com/Azure/azure-sdk-for-java - - com.azure - azure-client-sdk-parent - 1.0.0 - ../../pom.client.xml - - azure-java-build-docs @@ -45,6 +45,7 @@ org.slf4j slf4j-api
+ junit junit diff --git a/eventhubs/data-plane/readme.md b/eventhubs/data-plane/readme.md index ea52c6191cd0..38634dbb2979 100644 --- a/eventhubs/data-plane/readme.md +++ b/eventhubs/data-plane/readme.md @@ -41,11 +41,11 @@ the required versions of Apache Qpid Proton-J, and the cryptography library BCPK |azure-eventhubs|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs) ```XML - - com.microsoft.azure - azure-eventhubs - 2.0.0 - + + com.microsoft.azure + azure-eventhubs + 2.3.0 + ``` #### Microsoft Azure EventHubs Java Event Processor Host library @@ -58,12 +58,12 @@ It pulls the required versions of Event Hubs, Azure Storage and GSon libraries. |azure-eventhubs-eph|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs-eph/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs-eph) ```XML - - com.microsoft.azure - azure-eventhubs-eph - 2.2.0 - -``` + + com.microsoft.azure + azure-eventhubs-eph + 2.5.0 + +``` ## How to provide feedback diff --git a/pom.client.xml b/pom.client.xml index 460cb031d8cb..225f88e6694d 100644 --- a/pom.client.xml +++ b/pom.client.xml @@ -109,7 +109,7 @@ 1.10 3.1.11 0.31.0 - 1.1.0 + 1.2.0 2.11.1 2.9.3-01 2.4.16-03 @@ -249,13 +249,13 @@ log4j-api ${log4j-api.version} - + com.microsoft.rest.v2 client-runtime ${client-runtime.version.v2} - + org.slf4j slf4j-api @@ -362,7 +362,7 @@ ${cglib-nodep.version} test - + org.slf4j slf4j-simple