From 018dee59feae4a0a5dcfac86939ee5b2b22a517a Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 7 Sep 2020 00:44:14 -0700 Subject: [PATCH 01/21] Fixing the string format for task. --- .../azure/core/amqp/implementation/handler/DispatchHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java index 23293da62c2f..5c3c49eb7793 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java @@ -32,7 +32,7 @@ public DispatchHandler(Runnable work) { */ @Override public void onTimerTask(Event e) { - logger.verbose("Running task for event: %s", e); + logger.verbose("Running task for event: {}", e); this.work.run(); } } From 0f11498187747e57c71d4419ab26a0d584d7d0ed Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 7 Sep 2020 00:46:22 -0700 Subject: [PATCH 02/21] Adding this. in front of AmqpRetryOptions. --- .../java/com/azure/core/amqp/AmqpRetryOptions.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java index 0b266ef518f4..db3daea8af6c 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java @@ -23,11 +23,11 @@ public class AmqpRetryOptions { * Creates an instance with the default retry options set. */ public AmqpRetryOptions() { - maxRetries = 3; - delay = Duration.ofMillis(800); - maxDelay = Duration.ofMinutes(1); - tryTimeout = Duration.ofMinutes(1); - retryMode = AmqpRetryMode.EXPONENTIAL; + this.maxRetries = 3; + this.delay = Duration.ofMillis(800); + this.maxDelay = Duration.ofMinutes(1); + this.tryTimeout = Duration.ofMinutes(1); + this.retryMode = AmqpRetryMode.EXPONENTIAL; } /** From a6441c46524c157f092108cae34021e8ffef5359 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 7 Sep 2020 07:25:21 -0700 Subject: [PATCH 03/21] Cleaning up print statements. --- .../handler/CustomIOHandler.java | 4 ++- .../handler/DispatchHandler.java | 3 +-- .../implementation/handler/LinkHandler.java | 26 +++++++++---------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java index d000bdb44717..1258d7698783 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java @@ -9,6 +9,8 @@ import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.reactor.impl.IOHandler; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; + public class CustomIOHandler extends IOHandler { private final ClientLogger logger = new ClientLogger(CustomIOHandler.class); private final String connectionId; @@ -23,7 +25,7 @@ public void onTransportClosed(Event event) { final Connection connection = event.getConnection(); logger.info("onTransportClosed connectionId[{}], hostname[{}]", - connectionId, (connection != null ? connection.getHostname() : "n/a")); + connectionId, (connection != null ? connection.getHostname() : NOT_APPLICABLE)); if (transport != null && connection != null && connection.getTransport() != null) { transport.unbind(); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java index 5c3c49eb7793..75b7ec0d330d 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java @@ -23,8 +23,7 @@ public class DispatchHandler extends BaseHandler { * @param work The work to run on the {@link Reactor}. */ public DispatchHandler(Runnable work) { - Objects.requireNonNull(work); - this.work = work; + this.work = Objects.requireNonNull(work, "'work' cannot be null."); } /** diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java index e347bcc293bf..cffa641a5207 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java @@ -5,7 +5,6 @@ import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.LinkErrorContext; -import com.azure.core.amqp.implementation.ClientConstants; import com.azure.core.amqp.implementation.ExceptionUtil; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -14,11 +13,11 @@ import org.apache.qpid.proton.engine.Link; import static com.azure.core.amqp.implementation.AmqpErrorCode.TRACKING_ID_PROPERTY; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; abstract class LinkHandler extends Handler { - private final String entityPath; - ClientLogger logger; + final ClientLogger logger; LinkHandler(String connectionId, String hostname, String entityPath, ClientLogger logger) { super(connectionId, hostname); @@ -32,9 +31,10 @@ public void onLinkLocalClose(Event event) { final ErrorCondition condition = link.getCondition(); logger.info("onLinkLocalClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", - getConnectionId(), link.getName(), - condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE, - condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE); + getConnectionId(), + link.getName(), + condition != null ? condition.getCondition() : NOT_APPLICABLE, + condition != null ? condition.getDescription() : NOT_APPLICABLE); } @Override @@ -44,8 +44,8 @@ public void onLinkRemoteClose(Event event) { logger.info("onLinkRemoteClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", getConnectionId(), link.getName(), - condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE, - condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE); + condition != null ? condition.getCondition() : NOT_APPLICABLE, + condition != null ? condition.getDescription() : NOT_APPLICABLE); handleRemoteLinkClosed(event); } @@ -57,15 +57,15 @@ public void onLinkRemoteDetach(Event event) { logger.info("onLinkRemoteClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", getConnectionId(), link.getName(), - condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE, - condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE); + condition != null ? condition.getCondition() : NOT_APPLICABLE, + condition != null ? condition.getDescription() : NOT_APPLICABLE); handleRemoteLinkClosed(event); } @Override public void onLinkFinal(Event event) { - logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), event.getLink().getName()); + logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), event.getLink().getName()); close(); } @@ -83,8 +83,8 @@ public AmqpErrorContext getErrorContext(Link link) { private void processOnClose(Link link, ErrorCondition condition) { logger.info("processOnClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", getConnectionId(), link.getName(), - condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE, - condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE); + condition != null ? condition.getCondition() : NOT_APPLICABLE, + condition != null ? condition.getDescription() : NOT_APPLICABLE); if (condition != null && condition.getCondition() != null) { final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(), From edf4c4524097d4e16917fce133f10349a07dd910 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 7 Sep 2020 22:57:06 -0700 Subject: [PATCH 04/21] Fixing issues. --- .../implementation/handler/ReceiveLinkHandler.java | 14 ++++---------- .../implementation/handler/SendLinkHandler.java | 10 +++++++--- .../messaging/eventhubs/PartitionPumpManager.java | 2 +- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java index 0017fed355f9..c64b15571896 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java @@ -21,10 +21,10 @@ public class ReceiveLinkHandler extends LinkHandler { private final String linkName; - private AtomicBoolean isFirstResponse = new AtomicBoolean(true); + private final AtomicBoolean isFirstResponse = new AtomicBoolean(true); private final DirectProcessor deliveries; - private FluxSink deliverySink; - private Set queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final FluxSink deliverySink; + private final Set queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap<>()); public ReceiveLinkHandler(String connectionId, String hostname, String linkName, String entityPath) { super(connectionId, hostname, entityPath, new ClientLogger(ReceiveLinkHandler.class)); @@ -38,11 +38,9 @@ public String getLinkName() { } public Flux getDeliveredMessages() { - return deliveries - .doOnNext(this::removeQueuedDelivery); + return deliveries.doOnNext(delivery -> queuedDeliveries.remove(delivery)); } - @Override public void close() { deliverySink.complete(); @@ -138,8 +136,4 @@ public void onLinkRemoteClose(Event event) { super.onLinkRemoteClose(event); deliverySink.complete(); } - - private void removeQueuedDelivery(Delivery delivery) { - queuedDeliveries.remove(delivery); - } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java index 1d92def80e0f..ca4a884f992d 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java @@ -18,16 +18,20 @@ import java.util.concurrent.atomic.AtomicBoolean; public class SendLinkHandler extends LinkHandler { - private final String senderName; + private final String linkName; private final AtomicBoolean isFirstFlow = new AtomicBoolean(true); private final UnicastProcessor creditProcessor = UnicastProcessor.create(); private final DirectProcessor deliveryProcessor = DirectProcessor.create(); private final FluxSink creditSink = creditProcessor.sink(); private final FluxSink deliverySink = deliveryProcessor.sink(); - public SendLinkHandler(String connectionId, String hostname, String senderName, String entityPath) { + public SendLinkHandler(String connectionId, String hostname, String linkName, String entityPath) { super(connectionId, hostname, entityPath, new ClientLogger(SendLinkHandler.class)); - this.senderName = senderName; + this.linkName = linkName; + } + + public String getLinkName() { + return linkName; } public Flux getLinkCredits() { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index aa41cdd1b7ce..c673fd322176 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -127,7 +127,7 @@ void stopAllPartitionPumps() { */ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoint) { if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) { - logger.verbose("Consumer is already running for this partition {}", claimedOwnership.getPartitionId()); + logger.verbose("Consumer is already running for this partition {}", claimedOwnership.getPartitionId()); return; } From 14afc3bf8b7a29cd0f58472a46c7fb1580bba0b6 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 8 Sep 2020 01:52:14 -0700 Subject: [PATCH 05/21] Updating text in README.md --- .../microsoft-azure-eventhubs/README.md | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/README.md b/sdk/eventhubs/microsoft-azure-eventhubs/README.md index 95b9c61dbed2..1df6ef7457e4 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/README.md +++ b/sdk/eventhubs/microsoft-azure-eventhubs/README.md @@ -1,7 +1,7 @@ # Azure Event Hubs (Track 1) client library for Java -

Microsoft Azure Event Hubs Client for Java +

Microsoft Azure Event Hubs Client for Java

Azure Event Hubs is a hyper-scale data ingestion service, fully-managed by Microsoft, that enables you to collect, store and process trillions of events from websites, apps, IoT devices, and any stream of data. @@ -15,7 +15,7 @@ general and for an overview of Event Hubs Client for Java. - An **Event Hub producer** is a source of telemetry data, diagnostics information, usage logs, or other log data, as part of an embedded device solution, a mobile device application, a game title running on a console or other device, - some client or server based business solution, or a web site. + some client or server based business solution, or a website. - An **Event Hub consumer** picks up such information from the Event Hub and processes it. Processing may involve aggregation, complex computation, and filtering. Processing may also involve distribution or storage of the @@ -29,13 +29,15 @@ general and for an overview of Event Hubs Client for Java. - A **consumer group** is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and from their own - position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that - there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of + position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended + there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all the events from its partition; if there are multiple readers on the same partition, then they will receive duplicate events. -For more concepts and deeper discussion, see: [Event Hubs Features][event_hubs_features]. Also, the concepts for AMQP -are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version 1.0][oasis_amqp_v1]. +For more concepts and deeper discussion, see: +[Event Hubs Features](https://docs.microsoft.com/azure/event-hubs/event-hubs-features). Also, the concepts for AMQP +are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version +1.0](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html). ### Referencing the library @@ -50,12 +52,12 @@ the required versions of Apache Qpid Proton-J, and the cryptography library BCPK |--------|------------------| |azure-eventhubs|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs) -```XML - - com.microsoft.azure - azure-eventhubs - 2.3.1 - +```xml + + com.microsoft.azure + azure-eventhubs + 2.3.1 + ``` #### Microsoft Azure EventHubs Java Event Processor Host library @@ -80,14 +82,14 @@ It pulls the required versions of Event Hubs, Azure Storage and GSon libraries. First, if you experience any issues with the runtime behavior of the Azure Event Hubs service, please consider filing a support request right away. Your options for [getting support are enumerated here](https://azure.microsoft.com/support/options/). In the Azure portal, you can file a support request from the "Help -and support" menu in the upper right hand corner of the page. +and support" menu in the upper right-hand corner of the page. If you find issues in this library or have suggestions for improvement of code or documentation, you can [file an issue in the project's GitHub repository](https://github.com/Azure/azure-sdk-for-java/issues) or send across a pull request - see our [Contribution Guidelines](../azure-messaging-eventhubs/CONTRIBUTING.md). Issues related to runtime behavior of the service, such as sporadic exceptions or apparent service-side performance or -reliability issues can not be handled here. +reliability issues cannot be handled here. Generally, if you want to discuss Azure Event Hubs or this client library with the community and the maintainers, you can turn to [stackoverflow.com under the #azure-eventhub tag](http://stackoverflow.com/questions/tagged/azure-eventhub) From 7c7fef467d2caa3b6cf37f19a6acf496d5f0981a Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 8 Sep 2020 17:40:27 -0700 Subject: [PATCH 06/21] Remove getErrors() which overlaps with getEndpointStates(). Only ouputting one terminal state. --- .../handler/ConnectionHandler.java | 14 ++++---- .../amqp/implementation/handler/Handler.java | 26 +++++++------- .../implementation/handler/LinkHandler.java | 34 +++++++++---------- .../handler/ReceiveLinkHandler.java | 30 ++++++++-------- .../handler/SendLinkHandler.java | 26 +++++++------- .../handler/SessionHandler.java | 13 +++---- .../eventhubs/EventHubClientBuilder.java | 1 - 7 files changed, 73 insertions(+), 71 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java index 0858cd22bf44..4c73984d1f3a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java @@ -152,7 +152,6 @@ public void onTransportError(Event event) { if (connection != null) { notifyErrorContext(connection, condition); - onNext(connection.getRemoteState()); } // onTransportError event is not handled by the global IO Handler for cleanup @@ -172,7 +171,6 @@ public void onTransportClosed(Event event) { if (connection != null) { notifyErrorContext(connection, condition); - onNext(connection.getRemoteState()); } } @@ -208,8 +206,6 @@ public void onConnectionLocalClose(Event event) { transport.unbind(); // we proactively dispose IO even if service fails to close } } - - onNext(connection.getRemoteState()); } @Override @@ -218,9 +214,11 @@ public void onConnectionRemoteClose(Event event) { final ErrorCondition error = connection.getRemoteCondition(); logErrorCondition("onConnectionRemoteClose", connection, error); - - onNext(connection.getRemoteState()); - notifyErrorContext(connection, error); + if (error == null) { + onNext(connection.getRemoteState()); + } else { + notifyErrorContext(connection, error); + } } @Override @@ -262,7 +260,7 @@ private void notifyErrorContext(Connection connection, ErrorCondition condition) final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(), condition.getDescription(), getErrorContext()); - onNext(exception); + onError(exception); } private void logErrorCondition(String eventName, Connection connection, ErrorCondition error) { diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java index c68f3598a81f..4ddbde2218a9 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java @@ -8,16 +8,15 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.ReplayProcessor; -import reactor.core.publisher.UnicastProcessor; import java.io.Closeable; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class Handler extends BaseHandler implements Closeable { + private final AtomicBoolean isTerminal = new AtomicBoolean(); private final ReplayProcessor endpointStateProcessor = ReplayProcessor.cacheLastOrDefault(EndpointState.UNINITIALIZED); - private final UnicastProcessor errorContextProcessor = UnicastProcessor.create(); private final FluxSink endpointSink = endpointStateProcessor.sink(); - private final FluxSink errorSink = errorContextProcessor.sink(); private final String connectionId; private final String hostname; @@ -38,25 +37,26 @@ public Flux getEndpointStates() { return endpointStateProcessor.distinct(); } - public Flux getErrors() { - return errorContextProcessor; - } - void onNext(EndpointState state) { endpointSink.next(state); + } - if (state == EndpointState.CLOSED) { - endpointSink.complete(); + void onError(Throwable error) { + if (isTerminal.getAndSet(true)) { + return; } - } - void onNext(Throwable context) { - errorSink.next(context); + endpointSink.next(EndpointState.CLOSED); + endpointSink.error(error); } @Override public void close() { + if (isTerminal.getAndSet(true)) { + return; + } + + endpointSink.next(EndpointState.CLOSED); endpointSink.complete(); - errorSink.complete(); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java index cffa641a5207..73304eb14d54 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java @@ -65,7 +65,11 @@ public void onLinkRemoteDetach(Event event) { @Override public void onLinkFinal(Event event) { - logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), event.getLink().getName()); + final String linkName = event != null && event.getLink() != null + ? event.getLink().getName() + : NOT_APPLICABLE; + logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), linkName); + close(); } @@ -80,7 +84,15 @@ public AmqpErrorContext getErrorContext(Link link) { return new LinkErrorContext(getHostname(), entityPath, referenceId, link.getCredit()); } - private void processOnClose(Link link, ErrorCondition condition) { + private void handleRemoteLinkClosed(final Event event) { + final Link link = event.getLink(); + final ErrorCondition condition = link.getRemoteCondition(); + + if (link.getLocalState() != EndpointState.CLOSED) { + link.setCondition(condition); + link.close(); + } + logger.info("processOnClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", getConnectionId(), link.getName(), condition != null ? condition.getCondition() : NOT_APPLICABLE, @@ -90,21 +102,9 @@ private void processOnClose(Link link, ErrorCondition condition) { final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(), condition.getDescription(), getErrorContext(link)); - onNext(exception); - } - - onNext(EndpointState.CLOSED); - } - - private void handleRemoteLinkClosed(final Event event) { - final Link link = event.getLink(); - final ErrorCondition condition = link.getRemoteCondition(); - - if (link.getLocalState() != EndpointState.CLOSED) { - link.setCondition(condition); - link.close(); + onError(exception); + } else { + onNext(EndpointState.CLOSED); } - - processOnClose(link, condition); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java index c64b15571896..bb7474c63d68 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java @@ -4,9 +4,6 @@ package com.azure.core.amqp.implementation.handler; import com.azure.core.util.logging.ClientLogger; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; @@ -17,6 +14,9 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; public class ReceiveLinkHandler extends LinkHandler { @@ -66,18 +66,20 @@ public void onLinkLocalOpen(Event event) { @Override public void onLinkRemoteOpen(Event event) { final Link link = event.getLink(); - if (link instanceof Receiver) { - if (link.getRemoteSource() != null) { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteSource[{}]", - getConnectionId(), link.getName(), link.getRemoteSource()); + if (!(link instanceof Receiver)) { + return; + } - if (isFirstResponse.getAndSet(false)) { - onNext(EndpointState.ACTIVE); - } - } else { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], action[waitingForError]", - getConnectionId(), link.getName()); + if (link.getRemoteSource() != null) { + logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteSource[{}]", + getConnectionId(), link.getName(), link.getRemoteSource()); + + if (isFirstResponse.getAndSet(false)) { + onNext(EndpointState.ACTIVE); } + } else { + logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], action[waitingForError]", + getConnectionId(), link.getName()); } } @@ -133,7 +135,7 @@ public void onDelivery(Event event) { @Override public void onLinkRemoteClose(Event event) { - super.onLinkRemoteClose(event); deliverySink.complete(); + super.onLinkRemoteClose(event); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java index ca4a884f992d..4bd2bc8e7003 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java @@ -61,19 +61,21 @@ public void onLinkLocalOpen(Event event) { @Override public void onLinkRemoteOpen(Event event) { final Link link = event.getLink(); - if (link instanceof Sender) { - if (link.getRemoteTarget() != null) { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[{}]", - getConnectionId(), link.getName(), link.getRemoteTarget()); - - if (isFirstFlow.getAndSet(false)) { - onNext(EndpointState.ACTIVE); - } - } else { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[null], remoteSource[null], " - + "action[waitingForError]", - getConnectionId(), link.getName()); + if (!(link instanceof Sender)) { + return; + } + + if (link.getRemoteTarget() != null) { + logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[{}]", + getConnectionId(), link.getName(), link.getRemoteTarget()); + + if (isFirstFlow.getAndSet(false)) { + onNext(EndpointState.ACTIVE); } + } else { + logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[null], remoteSource[null], " + + "action[waitingForError]", + getConnectionId(), link.getName()); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java index 4c9b47d2beb6..5ab35a19bfc9 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java @@ -63,7 +63,7 @@ public void onSessionLocalOpen(Event e) { getConnectionId(), this.entityName, ioException.getMessage()); final Throwable exception = new AmqpException(false, message, ioException, getErrorContext()); - onNext(exception); + onError(exception); } } @@ -117,9 +117,9 @@ public void onSessionRemoteClose(Event e) { session.close(); } - onNext(EndpointState.CLOSED); - - if (condition != null) { + if (condition == null) { + onNext(EndpointState.CLOSED); + } else { final String id = getConnectionId(); final AmqpErrorContext context = getErrorContext(); final Exception exception; @@ -131,12 +131,13 @@ public void onSessionRemoteClose(Event e) { context); } else { exception = ExceptionUtil.toException(condition.getCondition().toString(), - String.format(Locale.US, "onSessionRemoteClose connectionId[%s], entityName[%s]", id, + String.format(Locale.US, + "onSessionRemoteClose connectionId[%s], entityName[%s]", id, entityName), context); } - onNext(exception); + onError(exception); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index d20613d9a5a0..604961035346 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -667,5 +667,4 @@ private ProxyOptions getProxyOptions(ProxyAuthenticationType authentication, Str coreProxyOptions.getAddress()), coreProxyOptions.getUsername(), coreProxyOptions.getPassword()); } } - } From e74046c253aedb56c5ddffee99a88aa3d1fa981e Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 8 Sep 2020 17:44:22 -0700 Subject: [PATCH 07/21] Fixing test breaks from removing getErrors. --- .../implementation/ReactorConnection.java | 12 ++-- .../amqp/implementation/ReactorReceiver.java | 19 ++--- .../amqp/implementation/ReactorSender.java | 9 +-- .../amqp/implementation/ReactorSession.java | 9 +-- .../RequestResponseChannel.java | 4 +- .../implementation/ReactorReceiverTest.java | 1 - .../implementation/ReactorSenderTest.java | 35 +++++----- .../implementation/ReactorSessionTest.java | 1 - .../RequestResponseChannelTest.java | 3 - .../implementation/handler/HandlerTest.java | 64 +++++++++++++---- .../handler/LinkHandlerTest.java | 70 ++++++++++++------- 11 files changed, 134 insertions(+), 93 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 8d67a2f86907..637820785996 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -12,8 +12,8 @@ import com.azure.core.amqp.implementation.handler.ConnectionHandler; import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.util.logging.ClientLogger; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; @@ -48,7 +48,8 @@ public class ReactorConnection implements AmqpConnection { private final DirectProcessor shutdownSignals = DirectProcessor.create(); private final ReplayProcessor endpointStates = ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private FluxSink endpointStatesSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final FluxSink endpointStatesSink = + endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); private final String connectionId; private final Mono connectionMono; @@ -117,12 +118,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption }, () -> { endpointStatesSink.next(AmqpEndpointState.CLOSED); endpointStatesSink.complete(); - }), - - this.handler.getErrors().subscribe(error -> { - logger.error("connectionId[{}] Error occurred in connection handler.", connectionId, error); - endpointStatesSink.error(error); - })); + })); } /** diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 5e0fcd044034..c058d9f285e2 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -70,6 +70,16 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl }) .subscribeWith(EmitterProcessor.create()); + this.handler.getEndpointStates() + .map(state -> { + logger.verbose("Connection state: {}", state); + return AmqpEndpointStateUtil.getConnectionState(state); + }) + .subscribeWith(endpointStates) + .doFinally(signal -> { + dispose(); + }); + this.subscriptions = Disposables.composite( this.handler.getEndpointStates().subscribe( state -> { @@ -85,13 +95,6 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl dispose(); }), - this.handler.getErrors().subscribe(error -> { - logger.error("connectionId[{}] linkName[{}] entityPath[{}] Error occurred in link.", - handler.getConnectionId(), receiver.getName(), entityPath, error); - endpointStateSink.error(error); - dispose(); - }), - this.tokenManager.getAuthorizationResults().subscribe( response -> { logger.verbose("Token refreshed: {}", response); @@ -105,7 +108,7 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl @Override public Flux getEndpointStates() { - return endpointStates; + return endpointStates.distinct(); } @Override diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index 066c0196626b..eefa57fb7e1f 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -121,12 +121,7 @@ class ReactorSender implements AmqpSendLink { endpointStateSink.next(AmqpEndpointState.CLOSED); endpointStateSink.complete(); hasConnected.set(false); - }), - - this.handler.getErrors().subscribe(error -> { - logger.error("[{}] Error occurred in sender error handler.", entityPath, error); - endpointStateSink.error(error); - }) + }) ); if (tokenManager != null) { @@ -145,7 +140,7 @@ class ReactorSender implements AmqpSendLink { @Override public Flux getEndpointStates() { - return endpointStates; + return endpointStates.distinct(); } @Override diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index 6330477d472e..2ecf31953fc4 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -112,13 +112,8 @@ public ReactorSession(Session session, SessionHandler sessionHandler, String ses endpointStateSink.next(AmqpEndpointState.CLOSED); endpointStateSink.complete(); dispose(); - }), - - this.sessionHandler.getErrors().subscribe(error -> { - logger.error("[{}] Error occurred in session error handler.", sessionName, error); - endpointStateSink.error(error); - dispose(); - })); + }) + ); session.open(); } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java index 0bb755be2a84..d2f7be85eb82 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java @@ -138,12 +138,10 @@ protected RequestResponseChannel(String connectionId, String fullyQualifiedNames receiveLinkHandler.getEndpointStates().subscribe( state -> endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state)), this::handleError, this::dispose), - receiveLinkHandler.getErrors().subscribe(this::handleError), sendLinkHandler.getEndpointStates().subscribe(state -> endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state)), - this::handleError, this::dispose), - sendLinkHandler.getErrors().subscribe(this::handleError) + this::handleError, this::dispose) ); //@formatter:on diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java index cfbc5b53546c..b60f00bf991e 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java @@ -144,7 +144,6 @@ void updateEndpointState() { .expectNext(AmqpEndpointState.ACTIVE) .then(() -> receiverHandler.close()) .expectNext(AmqpEndpointState.CLOSED) - .then(() -> reactorReceiver.dispose()) .verifyComplete(); } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java index 356bd1d5e0e9..c26b73e7c5e5 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java @@ -3,28 +3,11 @@ package com.azure.core.amqp.implementation; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.ExponentialAmqpRetryPolicy; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.handler.SendLinkHandler; -import java.io.IOException; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; - import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -52,6 +35,23 @@ import reactor.core.publisher.ReplayProcessor; import reactor.test.StepVerifier; +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Unit tests for {@link ReactorSender} */ @@ -98,7 +98,6 @@ public void setup() throws IOException { FluxSink sink1 = endpointStateReplayProcessor.sink(); sink1.next(EndpointState.ACTIVE); - when(handler.getErrors()).thenReturn(Flux.empty()); when(tokenManager.getAuthorizationResults()).thenReturn(Flux.just(AmqpResponseCode.ACCEPTED)); when(sender.getCredit()).thenReturn(100); when(sender.advance()).thenReturn(true); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index e88388d2e44d..1f79642c8ee8 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -108,7 +108,6 @@ public void verifyEndpointStates() { .expectNext(AmqpEndpointState.ACTIVE) .then(() -> handler.close()) .expectNext(AmqpEndpointState.CLOSED) - .then(() -> reactorSession.dispose()) .expectComplete() .verify(Duration.ofSeconds(10)); } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java index 80cfc91f2985..665bbeada14d 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java @@ -31,7 +31,6 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.ReplayProcessor; import reactor.test.StepVerifier; @@ -121,11 +120,9 @@ void beforeEach() { FluxSink sink1 = endpointStateReplayProcessor.sink(); sink1.next(EndpointState.ACTIVE); when(receiveLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); - when(receiveLinkHandler.getErrors()).thenReturn(Flux.never()); when(receiveLinkHandler.getDeliveredMessages()).thenReturn(deliveryProcessor); when(sendLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); - when(sendLinkHandler.getErrors()).thenReturn(Flux.never()); } @AfterEach diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java index 78895e08989a..c0a73a451ae2 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java @@ -24,14 +24,7 @@ public void initialHandlerState() { StepVerifier.create(handler.getEndpointStates()) .expectNext(EndpointState.UNINITIALIZED) .then(handler::close) - .verifyComplete(); - } - - @Test - public void initialErrors() { - // Act & Assert - StepVerifier.create(handler.getErrors()) - .then(handler::close) + .expectNext(EndpointState.CLOSED) .verifyComplete(); } @@ -44,6 +37,7 @@ public void propagatesStates() { .expectNext(EndpointState.ACTIVE) .then(() -> handler.onNext(EndpointState.ACTIVE)) .then(handler::close) + .expectNext(EndpointState.CLOSED) .verifyComplete(); } @@ -54,11 +48,55 @@ public void propagatesErrors() { final Throwable exception = new AmqpException(false, "Some test message.", context); // Act & Assert - StepVerifier.create(handler.getErrors()) - .then(() -> handler.onNext(exception)) - .expectNext(exception) - .then(handler::close) - .verifyComplete(); + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onError(exception)) + .expectNext(EndpointState.CLOSED) + .expectErrorMatches(e -> e.equals(exception)) + .verify(); + } + + @Test + public void propagatesErrorsOnce() { + // Arrange + final AmqpErrorContext context = new AmqpErrorContext("test namespace."); + final Throwable exception = new AmqpException(false, "Some test message.", context); + final Throwable exception2 = new AmqpException(false, "Some test message2.", context); + + // Act & Assert + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> { + handler.onError(exception); + handler.onError(exception2); + }) + .expectNext(EndpointState.CLOSED) + .expectErrorMatches(e -> e.equals(exception)) + .verify(); + + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.CLOSED) + .expectErrorMatches(e -> e.equals(exception)) + .verify(); + } + + @Test + public void completesOnce() { + // Act & Assert + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onNext(EndpointState.ACTIVE)) + .expectNext(EndpointState.ACTIVE) + .then(() -> handler.close()) + .expectNext(EndpointState.CLOSED) + .expectComplete() + .verify(); + + // The last state is always replayed before it is closed. + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.CLOSED) + .expectComplete() + .verify(); } private static class TestHandler extends Handler { diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java index f1ff19e8f33c..e052d36cb6a5 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java @@ -28,6 +28,7 @@ import static com.azure.core.amqp.exception.AmqpErrorCondition.LINK_STOLEN; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; @@ -147,18 +148,17 @@ void onLinkRemoteClose() { when(session.getLocalState()).thenReturn(EndpointState.ACTIVE); // Act - StepVerifier.Step endpointState = StepVerifier.create(handler.getEndpointStates()) - .expectNext(EndpointState.CLOSED); - - StepVerifier.Step throwableStep = StepVerifier.create(handler.getErrors()) - .assertNext(error -> { + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onLinkRemoteClose(event)) + .expectNext(EndpointState.CLOSED) + .expectErrorSatisfies(error -> { Assertions.assertTrue(error instanceof AmqpException); AmqpException exception = (AmqpException) error; Assertions.assertEquals(LINK_STOLEN, exception.getErrorCondition()); - }); - - handler.onLinkRemoteClose(event); + }) + .verify(); // Assert verify(link).setCondition(errorCondition); @@ -166,17 +166,13 @@ void onLinkRemoteClose() { verify(session, never()).setCondition(errorCondition); verify(session, never()).close(); - - endpointState.thenCancel().verify(); - throwableStep.then(() -> handler.close()) - .verifyComplete(); } /** - * Verifies that it does not close the link when the link is already in a closed endpoint state. + * Verifies that an error is propagated if there is an error condition on close. */ @Test - void onLinkRemoteCloseNoException() { + void onLinkRemoteCloseWithErrorCondition() { // Arrange final ErrorCondition errorCondition = new ErrorCondition(symbol, description); @@ -185,19 +181,17 @@ void onLinkRemoteCloseNoException() { when(link.getLocalState()).thenReturn(EndpointState.CLOSED); // Act & Assert - StepVerifier.Step endpointState = StepVerifier.create(handler.getEndpointStates()) + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onLinkRemoteClose(event)) .expectNext(EndpointState.CLOSED) - .expectNoEvent(Duration.ofSeconds(2)); - - StepVerifier.Step throwableStep = StepVerifier.create(handler.getErrors()) - .assertNext(error -> { + .expectErrorSatisfies(error -> { Assertions.assertTrue(error instanceof AmqpException); AmqpException exception = (AmqpException) error; Assertions.assertEquals(LINK_STOLEN, exception.getErrorCondition()); - }); - - handler.onLinkRemoteClose(event); + }) + .verify(); // Assert verify(link, never()).setCondition(errorCondition); @@ -205,11 +199,39 @@ void onLinkRemoteCloseNoException() { verify(session, never()).setCondition(errorCondition); verify(session, never()).close(); + } + + /** + * Verifies that no error is propagated. And it is closed instead. + */ + @Test + void onLinkRemoteCloseNoErrorCondition() { + // Arrange + final ErrorCondition errorCondition = new ErrorCondition(null, description); + final Event finalEvent = mock(Event.class); + + when(link.getRemoteCondition()).thenReturn(errorCondition); + when(link.getSession()).thenReturn(session); + when(link.getLocalState()).thenReturn(EndpointState.CLOSED); - endpointState.thenCancel().verify(); - throwableStep.thenCancel().verify(); + // Act & Assert + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onLinkRemoteClose(event)) + .expectNext(EndpointState.CLOSED) + .then(() -> handler.onLinkFinal(finalEvent)) + .expectComplete() + .verify(); + + // Assert + verify(link, never()).setCondition(errorCondition); + verify(link, never()).close(); + + verify(session, never()).setCondition(errorCondition); + verify(session, never()).close(); } + private static final class MockLinkHandler extends LinkHandler { MockLinkHandler(String connectionId, String hostname, String entityPath, ClientLogger logger) { super(connectionId, hostname, entityPath, logger); From 5cbb292d73b9d2c00d145370ba690af6cb7e9206 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 8 Sep 2020 22:33:45 -0700 Subject: [PATCH 08/21] Clean up tests and use subscribeWith. --- .../amqp/implementation/ReactorReceiver.java | 36 +++------------- .../amqp/implementation/ReactorSender.java | 41 ++++++++----------- .../amqp/implementation/ReactorSession.java | 30 ++------------ .../implementation/ReactorSessionTest.java | 3 +- .../handler/LinkHandlerTest.java | 1 - 5 files changed, 27 insertions(+), 84 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index c058d9f285e2..5765d153cf87 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -11,10 +11,8 @@ import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.ReplayProcessor; import java.io.IOException; @@ -36,13 +34,11 @@ public class ReactorReceiver implements AmqpReceiveLink { private final ReceiveLinkHandler handler; private final TokenManager tokenManager; private final ReactorDispatcher dispatcher; - private final Disposable.Composite subscriptions; + private final Disposable subscriptions; private final AtomicBoolean isDisposed = new AtomicBoolean(); private final EmitterProcessor messagesProcessor; private final ClientLogger logger = new ClientLogger(ReactorReceiver.class); - private final ReplayProcessor endpointStates = - ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final ReplayProcessor endpointStates; private final AtomicReference> creditSupplier = new AtomicReference<>(); @@ -69,33 +65,14 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl } }) .subscribeWith(EmitterProcessor.create()); - - this.handler.getEndpointStates() + this.endpointStates = this.handler.getEndpointStates() .map(state -> { logger.verbose("Connection state: {}", state); return AmqpEndpointStateUtil.getConnectionState(state); }) - .subscribeWith(endpointStates) - .doFinally(signal -> { - dispose(); - }); + .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); - this.subscriptions = Disposables.composite( - this.handler.getEndpointStates().subscribe( - state -> { - logger.verbose("Connection state: {}", state); - endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state)); - }, error -> { - logger.error("connectionId[{}] linkName[{}] entityPath[{}] Error occurred in connection.", - handler.getConnectionId(), receiver.getName(), entityPath, error); - endpointStateSink.error(error); - dispose(); - }, () -> { - endpointStateSink.next(AmqpEndpointState.CLOSED); - dispose(); - }), - - this.tokenManager.getAuthorizationResults().subscribe( + this.subscriptions = this.tokenManager.getAuthorizationResults().subscribe( response -> { logger.verbose("Token refreshed: {}", response); hasAuthorized.set(true); @@ -103,7 +80,7 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage()); hasAuthorized.set(false); - }, () -> hasAuthorized.set(false))); + }, () -> hasAuthorized.set(false)); } @Override @@ -165,7 +142,6 @@ public void dispose() { } subscriptions.dispose(); - endpointStateSink.complete(); messagesProcessor.onComplete(); tokenManager.close(); receiver.close(); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index eefa57fb7e1f..a31b1e9c9670 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -29,7 +29,6 @@ import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; @@ -74,9 +73,7 @@ class ReactorSender implements AmqpSendLink { private final PriorityQueue pendingSendsQueue = new PriorityQueue<>(1000, new DeliveryTagComparator()); private final ClientLogger logger = new ClientLogger(ReactorSender.class); - private final ReplayProcessor endpointStates = - ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final ReplayProcessor endpointStates; private final TokenManager tokenManager; private final MessageSerializer messageSerializer; @@ -101,38 +98,33 @@ class ReactorSender implements AmqpSendLink { this.retry = retry; this.timeout = timeout; + this.endpointStates = this.handler.getEndpointStates() + .map(state -> { + logger.verbose("[{}] Connection state: {}", entityPath, state); + this.hasConnected.set(state == EndpointState.ACTIVE); + return AmqpEndpointStateUtil.getConnectionState(state); + }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); + this.subscriptions = Disposables.composite( this.handler.getDeliveredMessages().subscribe(this::processDeliveredMessage), this.handler.getLinkCredits().subscribe(credit -> { - logger.verbose("Credits on link: {}", credit); + logger.verbose("connectionId[{}], entityPath[{}], linkName[{}]: Credits on link: {}", + handler.getConnectionId(), entityPath, getLinkName(), credit); this.scheduleWorkOnDispatcher(); - }), - - this.handler.getEndpointStates().subscribe( - state -> { - logger.verbose("[{}] Connection state: {}", entityPath, state); - this.hasConnected.set(state == EndpointState.ACTIVE); - endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state)); - }, error -> { - logger.error("[{}] Error occurred in sender endpoint handler.", entityPath, error); - endpointStateSink.error(error); - }, () -> { - endpointStateSink.next(AmqpEndpointState.CLOSED); - endpointStateSink.complete(); - hasConnected.set(false); - }) + }) ); if (tokenManager != null) { this.subscriptions.add(this.tokenManager.getAuthorizationResults().subscribe( response -> { - logger.verbose("Token refreshed: {}", response); + logger.verbose("connectionId[{}], entityPath[{}], linkName[{}]: Token refreshed: {}", + handler.getConnectionId(), entityPath, getLinkName(), response); hasAuthorized.set(true); }, error -> { - logger.info("clientId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", - handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage()); + logger.info("connectionId[{}], entityPath[{}], linkName[{}]: tokenRenewalFailure[{}]", + handler.getConnectionId(), entityPath, getLinkName(), error.getMessage()); hasAuthorized.set(false); }, () -> hasAuthorized.set(false))); } @@ -140,7 +132,7 @@ class ReactorSender implements AmqpSendLink { @Override public Flux getEndpointStates() { - return endpointStates.distinct(); + return endpointStates; } @Override @@ -293,7 +285,6 @@ public void dispose() { } subscriptions.dispose(); - endpointStateSink.complete(); tokenManager.close(); } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index 2ecf31953fc4..c7875bcf0cd3 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -24,9 +24,7 @@ import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; @@ -49,9 +47,7 @@ public class ReactorSession implements AmqpSession { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final ClientLogger logger = new ClientLogger(ReactorSession.class); - private final ReplayProcessor endpointStates = - ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final ReplayProcessor endpointStates; private final Session session; private final SessionHandler sessionHandler; @@ -61,7 +57,6 @@ public class ReactorSession implements AmqpSession { private final MessageSerializer messageSerializer; private final Duration openTimeout; - private final Disposable.Composite subscriptions; private final ReactorHandlerProvider handlerProvider; private final Mono cbsNodeSupplier; @@ -98,22 +93,9 @@ public ReactorSession(Session session, SessionHandler sessionHandler, String ses this.messageSerializer = messageSerializer; this.openTimeout = openTimeout; this.retryPolicy = retryPolicy; - - this.subscriptions = Disposables.composite( - this.sessionHandler.getEndpointStates().subscribe( - state -> { - logger.verbose("Connection state: {}", state); - endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state)); - }, error -> { - logger.error("[{}] Error occurred in session endpoint handler.", sessionName, error); - endpointStateSink.error(error); - dispose(); - }, () -> { - endpointStateSink.next(AmqpEndpointState.CLOSED); - endpointStateSink.complete(); - dispose(); - }) - ); + this.endpointStates = sessionHandler.getEndpointStates() + .map(AmqpEndpointStateUtil::getConnectionState) + .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); session.open(); } @@ -144,13 +126,9 @@ public void dispose() { logger.info("sessionId[{}]: Disposing of session.", sessionName); session.close(); - subscriptions.dispose(); openReceiveLinks.forEach((key, link) -> link.dispose()); - openReceiveLinks.clear(); - openSendLinks.forEach((key, link) -> link.dispose()); - openSendLinks.clear(); } /** diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index 1f79642c8ee8..a28945cf8892 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -39,7 +39,6 @@ public class ReactorSessionTest { private SessionHandler handler; private ReactorSession reactorSession; - private AmqpRetryPolicy retryPolicy; @Mock private Session session; @@ -71,7 +70,7 @@ public void setup() throws IOException { MockReactorHandlerProvider handlerProvider = new MockReactorHandlerProvider(reactorProvider, null, handler, null, null); AzureTokenManagerProvider azureTokenManagerProvider = new AzureTokenManagerProvider( CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, HOST, "a-test-scope"); - this.retryPolicy = RetryUtil.getRetryPolicy(new AmqpRetryOptions()); + AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(new AmqpRetryOptions()); this.reactorSession = new ReactorSession(session, handler, NAME, reactorProvider, handlerProvider, Mono.just(cbsNode), azureTokenManagerProvider, serializer, TIMEOUT, retryPolicy); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java index e052d36cb6a5..d0060c447f36 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java @@ -52,7 +52,6 @@ class LinkHandlerTest { private final String description = "test-description"; private final LinkHandler handler = new MockLinkHandler(CONNECTION_ID, HOSTNAME, ENTITY_PATH, logger); - @BeforeAll static void beforeAll() { StepVerifier.setDefaultTimeout(Duration.ofSeconds(30)); From a73a67a18214f245dbb2b4edd9d9dd9e65cfee4b Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 8 Sep 2020 23:39:44 -0700 Subject: [PATCH 09/21] Adding test. --- .../amqp/implementation/ReactorSession.java | 9 +- .../handler/SessionHandler.java | 19 +-- .../implementation/ReactorSessionTest.java | 109 ++++++++++++++---- .../handler/ReceiveLinkHandlerTest.java | 10 ++ 4 files changed, 112 insertions(+), 35 deletions(-) create mode 100644 sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandlerTest.java diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index c7875bcf0cd3..1cc6c42a4325 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -123,7 +123,8 @@ public void dispose() { return; } - logger.info("sessionId[{}]: Disposing of session.", sessionName); + logger.info("connectionId[{}], sessionId[{}]: Disposing of session.", sessionHandler.getConnectionId(), + sessionName); session.close(); @@ -334,8 +335,8 @@ private boolean removeLink(ConcurrentMap createConsumer(String linkName, String entityPath, Duration timeout, - AmqpRetryPolicy retry, Map sourceFilters, - Map receiverProperties, Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, + AmqpRetryPolicy retry, Map sourceFilters, Map receiverProperties, + Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) { if (isDisposed()) { @@ -418,7 +419,7 @@ protected Mono createProducer(String linkName, String entityPath, Dura return RetryUtil.withRetry( getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), - timeout, retry).then(tokenManager.authorize()).then(Mono.create(sink -> { + timeout, retry).then(tokenManager.authorize()).then(Mono.create(sink -> { try { // We have to invoke this in the same thread or else proton-j will not properly link up the created // sender because the link names are not unique. Link name == entity path. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java index 5ab35a19bfc9..0fb9aee7786b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java @@ -70,13 +70,16 @@ public void onSessionLocalOpen(Event e) { @Override public void onSessionRemoteOpen(Event e) { final Session session = e.getSession(); - - logger.info( - "onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}], sessionOutgoingWindow[{}]", - getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow()); - if (session.getLocalState() == EndpointState.UNINITIALIZED) { + logger.warning("onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}]," + + " sessionOutgoingWindow[{}] endpoint was uninitialised.", + getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow()); + session.open(); + } else { + logger.info("onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}]," + + " sessionOutgoingWindow[{}]", + getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow()); } onNext(EndpointState.ACTIVE); @@ -124,14 +127,12 @@ public void onSessionRemoteClose(Event e) { final AmqpErrorContext context = getErrorContext(); final Exception exception; if (condition.getCondition() == null) { - exception = new AmqpException(false, - String.format(Locale.US, + exception = new AmqpException(false, String.format(Locale.US, "onSessionRemoteClose connectionId[%s], entityName[%s], condition[%s]", id, entityName, condition), context); } else { - exception = ExceptionUtil.toException(condition.getCondition().toString(), - String.format(Locale.US, + exception = ExceptionUtil.toException(condition.getCondition().toString(), String.format(Locale.US, "onSessionRemoteClose connectionId[%s], entityName[%s]", id, entityName), context); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index a28945cf8892..646ae40e1c06 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -4,15 +4,21 @@ package com.azure.core.amqp.implementation; import com.azure.core.amqp.AmqpEndpointState; +import com.azure.core.amqp.AmqpLink; import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.ClaimsBasedSecurityNode; +import com.azure.core.amqp.exception.AmqpResponseCode; +import com.azure.core.amqp.implementation.handler.SendLinkHandler; import com.azure.core.amqp.implementation.handler.SessionHandler; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Record; +import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.reactor.Reactor; -import org.apache.qpid.proton.reactor.Selectable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -20,12 +26,20 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.io.IOException; import java.time.Duration; - +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -45,45 +59,55 @@ public class ReactorSessionTest { @Mock private Reactor reactor; @Mock - private Selectable selectable; - @Mock private Event event; @Mock + private Receiver receiver; + @Mock + private Sender sender; + @Mock + private Record record; + @Mock private ClaimsBasedSecurityNode cbsNode; @Mock private MessageSerializer serializer; @Mock private ReactorProvider reactorProvider; + @Mock + private ReactorHandlerProvider reactorHandlerProvider; + @Mock + private ReactorDispatcher reactorDispatcher; + @Mock + private TokenManagerProvider tokenManagerProvider; + + private Mono cbsNodeSupplier; @BeforeEach public void setup() throws IOException { MockitoAnnotations.initMocks(this); - when(reactor.selectable()).thenReturn(selectable); - when(event.getSession()).thenReturn(session); - ReactorDispatcher dispatcher = new ReactorDispatcher(reactor); - this.handler = new SessionHandler(ID, HOST, ENTITY_PATH, dispatcher, Duration.ofSeconds(60)); + this.handler = new SessionHandler(ID, HOST, ENTITY_PATH, reactorDispatcher, Duration.ofSeconds(60)); + this.cbsNodeSupplier = Mono.just(cbsNode); when(reactorProvider.getReactor()).thenReturn(reactor); - when(reactorProvider.getReactorDispatcher()).thenReturn(dispatcher); + when(reactorProvider.getReactorDispatcher()).thenReturn(reactorDispatcher); - MockReactorHandlerProvider handlerProvider = new MockReactorHandlerProvider(reactorProvider, null, handler, null, null); - AzureTokenManagerProvider azureTokenManagerProvider = new AzureTokenManagerProvider( - CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, HOST, "a-test-scope"); - AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(new AmqpRetryOptions()); - this.reactorSession = new ReactorSession(session, handler, NAME, reactorProvider, handlerProvider, - Mono.just(cbsNode), azureTokenManagerProvider, serializer, TIMEOUT, retryPolicy); + when(event.getSession()).thenReturn(session); + when(sender.attachments()).thenReturn(record); + when(receiver.attachments()).thenReturn(record); + doAnswer(invocation -> { + final Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(reactorDispatcher).invoke(any()); + + AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(new AmqpRetryOptions()); + this.reactorSession = new ReactorSession(session, handler, NAME, reactorProvider, reactorHandlerProvider, + cbsNodeSupplier, tokenManagerProvider, serializer, TIMEOUT, retryPolicy); } @AfterEach public void teardown() { - session = null; - reactor = null; - selectable = null; - event = null; - cbsNode = null; - Mockito.framework().clearInlineMocks(); } @@ -114,6 +138,47 @@ public void verifyEndpointStates() { @Test public void verifyDispose() { reactorSession.dispose(); - Assertions.assertTrue(reactorSession.isDisposed()); + assertTrue(reactorSession.isDisposed()); + } + + @Test + void createProducer() { + // Arrange + final String linkName = "test-link-name"; + final String entityPath = "test-entity-path"; + final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class); + final Map linkProperties = new HashMap<>(); + final Duration timeout = Duration.ofSeconds(30); + final TokenManager tokenManager = mock(TokenManager.class); + final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath); + + when(session.sender(linkName)).thenReturn(sender); + when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager); + when(tokenManager.authorize()).thenReturn(Mono.just(1000L)); + when(tokenManager.getAuthorizationResults()) + .thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.ACCEPTED))); + when(reactorHandlerProvider.createSendLinkHandler(ID, HOST, linkName, entityPath)) + .thenReturn(sendLinkHandler); + + StepVerifier.create( + reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy, linkProperties)) + .then(() -> handler.onSessionRemoteOpen(event)) + .thenAwait(Duration.ofSeconds(2)) + .assertNext(producer -> assertTrue(producer instanceof ReactorSender)) + .verifyComplete(); + + final AmqpLink sendLink = reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy, + linkProperties) + .block(TIMEOUT); + + assertNotNull(sendLink); + } + + @Test + void createConsumer() { + // Arrange + final String linkName = "test-link-name"; + final String entityPath = "test-entity-path"; + final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class); } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandlerTest.java new file mode 100644 index 000000000000..e33929742ccd --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandlerTest.java @@ -0,0 +1,10 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation.handler; + +/** + * Tests for {@link ReceiveLinkHandler}. + */ +class ReceiveLinkHandlerTest { +} From 95de918b9b36a7ce899b641712970ea404684a26 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 01:21:30 -0700 Subject: [PATCH 10/21] Adding final. --- .../java/com/azure/core/amqp/implementation/ReactorSession.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index 1cc6c42a4325..46ea5d295c86 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -63,7 +63,7 @@ public class ReactorSession implements AmqpSession { private final AtomicReference> coordinatorLink = new AtomicReference<>(); private final AtomicReference transactionCoordinator = new AtomicReference<>(); - private AmqpRetryPolicy retryPolicy; + private final AmqpRetryPolicy retryPolicy; /** * Creates a new AMQP session using proton-j. From 7a9883b5988a12cc9b595a08571f0443f47153b9 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 01:21:41 -0700 Subject: [PATCH 11/21] Adding test for session --- .../implementation/ReactorSessionTest.java | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index 646ae40e1c06..40824916875a 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -18,6 +18,7 @@ import org.apache.qpid.proton.engine.Record; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.reactor.Reactor; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -141,6 +142,9 @@ public void verifyDispose() { assertTrue(reactorSession.isDisposed()); } + /** + * Verifies that we can create the producer. + */ @Test void createProducer() { // Arrange @@ -174,6 +178,44 @@ void createProducer() { assertNotNull(sendLink); } + /** + * Verifies that we can create the producer. + */ + @Test + void createProducerAgainAfterException() { + // Arrange + final String linkName = "test-link-name"; + final String entityPath = "test-entity-path"; + final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class); + final Map linkProperties = new HashMap<>(); + final Duration timeout = Duration.ofSeconds(30); + final TokenManager tokenManager = mock(TokenManager.class); + final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath); + final Message message = mock(Message.class); + + when(session.sender(linkName)).thenReturn(sender); + when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager); + when(tokenManager.authorize()).thenReturn(Mono.just(1000L)); + when(tokenManager.getAuthorizationResults()) + .thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.ACCEPTED))); + when(reactorHandlerProvider.createSendLinkHandler(ID, HOST, linkName, entityPath)) + .thenReturn(sendLinkHandler); + + handler.onSessionRemoteOpen(event); + + final AmqpLink sendLink = reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy, + linkProperties) + .block(TIMEOUT); + + assertNotNull(sendLink); + assertTrue(sendLink instanceof AmqpSendLink); + + final AmqpSendLink reactorSendLink = (AmqpSendLink) sendLink; + + // Act & Assert + StepVerifier.create(reactorSendLink.send(message)); + } + @Test void createConsumer() { // Arrange From c940d34284b77d363b7f5d1debc4204f8c852bbf Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 08:27:30 -0700 Subject: [PATCH 12/21] Using close() --- .../azure/core/amqp/implementation/handler/LinkHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java index 73304eb14d54..8ea445cb91ed 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java @@ -69,7 +69,6 @@ public void onLinkFinal(Event event) { ? event.getLink().getName() : NOT_APPLICABLE; logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), linkName); - close(); } @@ -104,7 +103,7 @@ private void handleRemoteLinkClosed(final Event event) { onError(exception); } else { - onNext(EndpointState.CLOSED); + close(); } } } From ecd66e4ac9aee092aa2c444999cefb17462184a2 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 09:36:20 -0700 Subject: [PATCH 13/21] Updating log messages. --- .../implementation/ReactorConnection.java | 35 ++++--------------- .../amqp/implementation/ReactorReceiver.java | 3 +- .../amqp/implementation/ReactorSender.java | 3 +- .../amqp/implementation/ReactorSession.java | 6 +++- 4 files changed, 16 insertions(+), 31 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 637820785996..35b7443a09ff 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -20,10 +20,8 @@ import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.reactor.Reactor; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; import reactor.core.scheduler.Schedulers; @@ -46,10 +44,7 @@ public class ReactorConnection implements AmqpConnection { private final AtomicBoolean hasConnection = new AtomicBoolean(); private final AtomicBoolean isDisposed = new AtomicBoolean(); private final DirectProcessor shutdownSignals = DirectProcessor.create(); - private final ReplayProcessor endpointStates = - ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private final FluxSink endpointStatesSink = - endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final ReplayProcessor endpointStates; private final String connectionId; private final Mono connectionMono; @@ -59,7 +54,6 @@ public class ReactorConnection implements AmqpConnection { private final MessageSerializer messageSerializer; private final ConnectionOptions connectionOptions; private final ReactorProvider reactorProvider; - private final Disposable.Composite subscriptions; private final AmqpRetryPolicy retryPolicy; private final SenderSettleMode senderSettleMode; private final ReceiverSettleMode receiverSettleMode; @@ -87,8 +81,8 @@ public class ReactorConnection implements AmqpConnection { */ public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, TokenManagerProvider tokenManagerProvider, - MessageSerializer messageSerializer, String product, String clientVersion, - SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) { + MessageSerializer messageSerializer, String product, String clientVersion, SenderSettleMode senderSettleMode, + ReceiverSettleMode receiverSettleMode) { this.connectionOptions = connectionOptions; this.reactorProvider = reactorProvider; @@ -107,18 +101,10 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption this.connectionMono = Mono.fromCallable(this::getOrCreateConnection) .doOnSubscribe(c -> hasConnection.set(true)); - this.subscriptions = Disposables.composite( - this.handler.getEndpointStates().subscribe( - state -> { - logger.verbose("connectionId[{}]: Connection state: {}", connectionId, state); - endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state)); - }, error -> { - logger.error("connectionId[{}] Error occurred in connection endpoint.", connectionId, error); - endpointStatesSink.error(error); - }, () -> { - endpointStatesSink.next(AmqpEndpointState.CLOSED); - endpointStatesSink.complete(); - })); + this.endpointStates = this.handler.getEndpointStates().map(state -> { + logger.verbose("connectionId[{}]: State {}", connectionId, state); + return AmqpEndpointStateUtil.getConnectionState(state); + }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); } /** @@ -273,8 +259,6 @@ public void dispose() { } logger.info("connectionId[{}]: Disposing of ReactorConnection.", connectionId); - subscriptions.dispose(); - endpointStatesSink.complete(); final String[] keys = sessionMap.keySet().toArray(new String[0]); for (String key : keys) { @@ -389,11 +373,6 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) { "onReactorError connectionId[{}], hostName[{}], message[Shutting down], shutdown signal[{}]", getId(), getFullyQualifiedNamespace(), shutdownSignal.isInitiatedByClient(), shutdownSignal); - if (!endpointStatesSink.isCancelled()) { - endpointStatesSink.next(AmqpEndpointState.CLOSED); - endpointStatesSink.complete(); - } - dispose(); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 5765d153cf87..d30e17936903 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -67,7 +67,8 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl .subscribeWith(EmitterProcessor.create()); this.endpointStates = this.handler.getEndpointStates() .map(state -> { - logger.verbose("Connection state: {}", state); + logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", handler.getConnectionId(), + entityPath, getLinkName(), state); return AmqpEndpointStateUtil.getConnectionState(state); }) .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index a31b1e9c9670..b66a947ee87b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -100,7 +100,8 @@ class ReactorSender implements AmqpSendLink { this.endpointStates = this.handler.getEndpointStates() .map(state -> { - logger.verbose("[{}] Connection state: {}", entityPath, state); + logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", handler.getConnectionId(), + entityPath, getLinkName(), state); this.hasConnected.set(state == EndpointState.ACTIVE); return AmqpEndpointStateUtil.getConnectionState(state); }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index 46ea5d295c86..c116e2f5605b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -94,7 +94,11 @@ public ReactorSession(Session session, SessionHandler sessionHandler, String ses this.openTimeout = openTimeout; this.retryPolicy = retryPolicy; this.endpointStates = sessionHandler.getEndpointStates() - .map(AmqpEndpointStateUtil::getConnectionState) + .map(state -> { + logger.verbose("connectionId[{}], sessionName[{}]: State ", sessionHandler.getConnectionId(), + sessionName, state); + return AmqpEndpointStateUtil.getConnectionState(state); + }) .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); session.open(); From 773dde99aed02fd16738532b488a16dcff1c4987 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 12:36:19 -0700 Subject: [PATCH 14/21] Locally closing on connection error. --- .../implementation/ReactorConnection.java | 68 ++++++++++++------- .../amqp/implementation/ReactorReceiver.java | 37 ++++++++++ .../amqp/implementation/ReactorSender.java | 25 +++++++ .../amqp/implementation/ReactorSession.java | 49 ++++++++----- .../implementation/ReactorConnectionTest.java | 6 +- .../implementation/ReactorSessionTest.java | 17 +++-- 6 files changed, 153 insertions(+), 49 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 35b7443a09ff..9584b9452f82 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -12,6 +12,8 @@ import com.azure.core.amqp.implementation.handler.ConnectionHandler; import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.util.logging.ClientLogger; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; @@ -22,6 +24,7 @@ import reactor.core.Disposable; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; import reactor.core.scheduler.Schedulers; @@ -33,6 +36,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; + public class ReactorConnection implements AmqpConnection { private static final String CBS_SESSION_NAME = "cbs-session"; private static final String CBS_ADDRESS = "$cbs"; @@ -44,6 +49,7 @@ public class ReactorConnection implements AmqpConnection { private final AtomicBoolean hasConnection = new AtomicBoolean(); private final AtomicBoolean isDisposed = new AtomicBoolean(); private final DirectProcessor shutdownSignals = DirectProcessor.create(); + private final FluxSink shutdownSignalsSink = shutdownSignals.sink(); private final ReplayProcessor endpointStates; private final String connectionId; @@ -101,7 +107,9 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption this.connectionMono = Mono.fromCallable(this::getOrCreateConnection) .doOnSubscribe(c -> hasConnection.set(true)); - this.endpointStates = this.handler.getEndpointStates().map(state -> { + this.endpointStates = this.handler.getEndpointStates() + .takeUntilOther(shutdownSignals) + .map(state -> { logger.verbose("connectionId[{}]: State {}", connectionId, state); return AmqpEndpointStateUtil.getConnectionState(state); }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); @@ -231,17 +239,7 @@ protected AmqpSession createSession(String sessionName, Session session, Session */ @Override public boolean removeSession(String sessionName) { - if (sessionName == null) { - return false; - } - - final SessionSubscription removed = sessionMap.remove(sessionName); - - if (removed != null) { - removed.dispose(); - } - - return removed != null; + return removeSession(sessionName, null); } @Override @@ -254,16 +252,23 @@ public boolean isDisposed() { */ @Override public void dispose() { + dispose(null); + shutdownSignalsSink.next(new AmqpShutdownSignal(false, true, + "Disposed by client.")); + } + + public void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } - logger.info("connectionId[{}]: Disposing of ReactorConnection.", connectionId); + logger.info("connectionId[{}], errorCondition[{}]: Disposing of ReactorConnection.", connectionId, + errorCondition != null ? errorCondition : NOT_APPLICABLE); final String[] keys = sessionMap.keySet().toArray(new String[0]); for (String key : keys) { logger.info("connectionId[{}]: Removing session '{}'", connectionId, key); - removeSession(key); + removeSession(key, errorCondition); } if (connection != null) { @@ -311,6 +316,20 @@ protected Mono createRequestResponseChannel(String sessi new ClientLogger(String.format("%s<%s>", RequestResponseChannel.class, sessionName)))); } + private boolean removeSession(String sessionName, ErrorCondition errorCondition) { + if (sessionName == null) { + return false; + } + + final SessionSubscription removed = sessionMap.remove(sessionName); + + if (removed != null) { + removed.dispose(errorCondition); + } + + return removed != null; + } + private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() { if (cbsChannel == null) { logger.info("Setting CBS channel."); @@ -360,6 +379,7 @@ public void onConnectionError(Throwable exception) { getId(), getFullyQualifiedNamespace(), exception.getMessage()); endpointStates.onError(exception); + ReactorConnection.this.dispose(); } @Override @@ -373,11 +393,12 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) { "onReactorError connectionId[{}], hostName[{}], message[Shutting down], shutdown signal[{}]", getId(), getFullyQualifiedNamespace(), shutdownSignal.isInitiatedByClient(), shutdownSignal); - dispose(); + dispose(new ErrorCondition(Symbol.getSymbol("onReactorError"), shutdownSignal.toString())); + shutdownSignalsSink.next(shutdownSignal); } } - private static final class SessionSubscription implements Disposable { + private static final class SessionSubscription { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final AmqpSession session; private final Disposable subscription; @@ -387,22 +408,23 @@ private SessionSubscription(AmqpSession session, Disposable subscription) { this.subscription = subscription; } - public Disposable getSubscription() { - return subscription; - } - public AmqpSession getSession() { return session; } - @Override - public void dispose() { + public void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } + if (session instanceof ReactorSession) { + final ReactorSession reactorSession = (ReactorSession) session; + reactorSession.dispose(errorCondition); + } else { + session.dispose(); + } + subscription.dispose(); - session.dispose(); } } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index d30e17936903..8ea3afd1ccc0 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -7,7 +7,9 @@ import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import reactor.core.Disposable; @@ -158,6 +160,41 @@ public void dispose() { } } + /** + * Disposes of the sender when an exception is encountered. + * + * @param condition Error condition associated with close operation. + */ + public void dispose(ErrorCondition condition) { + if (isDisposed.getAndSet(true)) { + return; + } + + logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}", + handler.getConnectionId(), entityPath, getLinkName(), condition); + + if (receiver.getLocalState() != EndpointState.CLOSED) { + receiver.close(); + + if (receiver.getCondition() == null) { + receiver.setCondition(condition); + } + } + + try { + dispatcher.invoke(() -> { + receiver.free(); + handler.close(); + }); + } catch (IOException e) { + logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", e); + handler.close(); + } + + messagesProcessor.onComplete(); + tokenManager.close(); + } + protected Message decodeDelivery(Delivery delivery) { final int messageSize = delivery.pending(); final byte[] buffer = new byte[messageSize]; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index b66a947ee87b..5b696ca957c5 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -21,6 +21,7 @@ import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.transaction.Declared; import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Sender; @@ -50,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.azure.core.amqp.implementation.ClientConstants.MAX_AMQP_HEADER_SIZE_BYTES; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; import static com.azure.core.amqp.implementation.ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS; import static java.nio.charset.StandardCharsets.UTF_8; @@ -281,12 +283,35 @@ public boolean isDisposed() { @Override public void dispose() { + dispose(null); + } + + /** + * Disposes of the sender when an exception is encountered. + * + * @param errorCondition Error condition associated with close operation. + */ + public void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } subscriptions.dispose(); tokenManager.close(); + + if (sender.getLocalState() == EndpointState.CLOSED) { + return; + } + + logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}", + handler.getConnectionId(), entityPath, getLinkName(), + errorCondition != null ? errorCondition : NOT_APPLICABLE); + + if (errorCondition != null && sender.getCondition() == null) { + sender.setCondition(errorCondition); + } + + sender.close(); } @Override diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index c116e2f5605b..cedf359bdcb3 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -17,9 +17,11 @@ import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.amqp.transaction.Coordinator; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; @@ -36,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; + /** * Represents an AMQP session using proton-j reactor. */ @@ -123,17 +127,27 @@ public boolean isDisposed() { */ @Override public void dispose() { + dispose(null); + } + + public void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } - logger.info("connectionId[{}], sessionId[{}]: Disposing of session.", sessionHandler.getConnectionId(), - sessionName); + logger.info("connectionId[{}], sessionId[{}], errorCondition[{}]: Disposing of session.", + sessionHandler.getConnectionId(), sessionName, errorCondition != null ? errorCondition : NOT_APPLICABLE); - session.close(); + if (session.getLocalState() != EndpointState.CLOSED) { + session.close(); + + if (session.getCondition() == null) { + session.setCondition(errorCondition); + } + } - openReceiveLinks.forEach((key, link) -> link.dispose()); - openSendLinks.forEach((key, link) -> link.dispose()); + openReceiveLinks.forEach((key, link) -> link.dispose(errorCondition)); + openSendLinks.forEach((key, link) -> link.dispose(errorCondition)); } /** @@ -152,7 +166,6 @@ public Duration getOperationTimeout() { return openTimeout; } - /** * {@inheritDoc} */ @@ -261,7 +274,7 @@ private Mono createCoordinatorSendLink(Duration timeout, AmqpRetry } else { logger.info("linkName[{}]: Another coordinator send link exists. Disposing of new one.", TRANSACTION_LINK_NAME); - linkSubscription.dispose(); + linkSubscription.dispose(null); } sink.success(coordinatorLink.get().getLink()); @@ -311,7 +324,7 @@ private boolean removeLink(ConcurrentMap removed = openLinks.remove(key); if (removed != null) { - removed.dispose(); + removed.dispose(null); } return removed != null; @@ -550,7 +563,7 @@ private LinkSubscription getSubscription(String linkName, Strin return new LinkSubscription<>(reactorReceiver, subscription); } - private static final class LinkSubscription implements Disposable { + private static final class LinkSubscription { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final T link; private final Disposable subscription; @@ -560,22 +573,26 @@ private LinkSubscription(T link, Disposable subscription) { this.subscription = subscription; } - public Disposable getSubscription() { - return subscription; - } - public T getLink() { return link; } - @Override - public void dispose() { + public void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } + if (link instanceof ReactorReceiver) { + final ReactorReceiver reactorReceiver = (ReactorReceiver) link; + reactorReceiver.dispose(errorCondition); + } else if (link instanceof ReactorSender) { + final ReactorSender reactorSender = (ReactorSender) link; + reactorSender.dispose(errorCondition); + } else { + link.dispose(); + } + subscription.dispose(); - link.dispose(); } } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java index 5c670c80d183..27486aabfeb5 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java @@ -248,9 +248,7 @@ void initialConnectionState() { // Assert StepVerifier.create(connection.getEndpointStates()) .expectNext(AmqpEndpointState.UNINITIALIZED) - .then(() -> { - connection.dispose(); - }) + .then(() -> connection.dispose()) .verifyComplete(); } @@ -364,7 +362,7 @@ void cannotCreateResourcesOnFailure() { } @Test - void cannotCreateSessionWhenDisposed() { + void closesDownstreamLinks() { } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index 40824916875a..061d2fa25c35 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -8,17 +8,18 @@ import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.ClaimsBasedSecurityNode; +import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.handler.SendLinkHandler; import com.azure.core.amqp.implementation.handler.SessionHandler; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Record; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.reactor.Reactor; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; @@ -191,7 +192,13 @@ void createProducerAgainAfterException() { final Duration timeout = Duration.ofSeconds(30); final TokenManager tokenManager = mock(TokenManager.class); final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath); - final Message message = mock(Message.class); + + final Event closeSendEvent = mock(Event.class); + when(closeSendEvent.getLink()).thenReturn(sender); + + final ErrorCondition errorCondition = new ErrorCondition( + Symbol.valueOf(AmqpErrorCondition.SERVER_BUSY_ERROR.getErrorCondition()), "test-busy"); + when(sender.getRemoteCondition()).thenReturn(errorCondition); when(session.sender(linkName)).thenReturn(sender); when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager); @@ -210,10 +217,8 @@ void createProducerAgainAfterException() { assertNotNull(sendLink); assertTrue(sendLink instanceof AmqpSendLink); - final AmqpSendLink reactorSendLink = (AmqpSendLink) sendLink; - - // Act & Assert - StepVerifier.create(reactorSendLink.send(message)); + // Act + sendLinkHandler.onLinkRemoteClose(closeSendEvent); } @Test From d8aae082fb882d1e52ccbdfd5304ac3d26f9daf1 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 15:20:30 -0700 Subject: [PATCH 15/21] Fix build breaks in Service Bus. --- .../implementation/ServiceBusReactorReceiverTest.java | 1 - .../implementation/ServiceBusReactorSessionTest.java | 4 ---- 2 files changed, 5 deletions(-) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java index bc4d3848da80..6a5d79386fdb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java @@ -102,7 +102,6 @@ void setup(TestInfo testInfo) throws IOException { when(receiveLinkHandler.getDeliveredMessages()).thenReturn(deliveryProcessor); when(receiveLinkHandler.getLinkName()).thenReturn(LINK_NAME); when(receiveLinkHandler.getEndpointStates()).thenReturn(endpointStates); - when(receiveLinkHandler.getErrors()).thenReturn(Flux.never()); when(tokenManager.getAuthorizationResults()).thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.OK))); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java index 2f4b1ea09b5c..60988b94fc83 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java @@ -146,7 +146,6 @@ void setup(TestInfo testInfo) throws IOException { sink1.next(EndpointState.ACTIVE); when(handler.getHostname()).thenReturn(HOSTNAME); when(handler.getConnectionId()).thenReturn(CONNECTION_ID); - when(handler.getErrors()).thenReturn(Flux.empty()); when(handlerProvider.createSendLinkHandler(CONNECTION_ID, HOSTNAME, viaEntityPathSenderLinkName, viaEntityPath)) .thenReturn(sendViaEntityLinkHandler); @@ -165,9 +164,6 @@ void setup(TestInfo testInfo) throws IOException { when(sendViaEntityLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); when(sendEntityLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); - when(sendViaEntityLinkHandler.getErrors()).thenReturn(Flux.empty()); - when(sendEntityLinkHandler.getErrors()).thenReturn(Flux.empty()); - when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, viaEntityPath)).thenReturn(tokenManagerViaQueue); when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManagerEntity); From 0aa1e2be0625c7b807fb51eb64f812d3fe871e64 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 15:35:48 -0700 Subject: [PATCH 16/21] Fix indentation. --- .../amqp/implementation/ReactorReceiver.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 8ea3afd1ccc0..c8c75c3e798c 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -76,14 +76,14 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); this.subscriptions = this.tokenManager.getAuthorizationResults().subscribe( - response -> { - logger.verbose("Token refreshed: {}", response); - hasAuthorized.set(true); - }, error -> { - logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", - handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage()); - hasAuthorized.set(false); - }, () -> hasAuthorized.set(false)); + response -> { + logger.verbose("Token refreshed: {}", response); + hasAuthorized.set(true); + }, error -> { + logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", + handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage()); + hasAuthorized.set(false); + }, () -> hasAuthorized.set(false)); } @Override From c6655e2dc2f0e72977cb170ab913644e0bd2b2fe Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 18:27:40 -0700 Subject: [PATCH 17/21] Adding more logging. --- .../handler/ReceiveLinkHandler.java | 28 ++++++++++--------- .../handler/SendLinkHandler.java | 26 +++++++++-------- 2 files changed, 29 insertions(+), 25 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java index bb7474c63d68..f52756a50d9a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java @@ -25,12 +25,14 @@ public class ReceiveLinkHandler extends LinkHandler { private final DirectProcessor deliveries; private final FluxSink deliverySink; private final Set queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final String entityPath; public ReceiveLinkHandler(String connectionId, String hostname, String linkName, String entityPath) { super(connectionId, hostname, entityPath, new ClientLogger(ReceiveLinkHandler.class)); this.deliveries = DirectProcessor.create(); this.deliverySink = deliveries.sink(FluxSink.OverflowStrategy.BUFFER); this.linkName = linkName; + this.entityPath = entityPath; } public String getLinkName() { @@ -58,8 +60,8 @@ public void close() { public void onLinkLocalOpen(Event event) { final Link link = event.getLink(); if (link instanceof Receiver) { - logger.info("onLinkLocalOpen connectionId[{}], linkName[{}], localSource[{}]", - getConnectionId(), link.getName(), link.getSource()); + logger.info("onLinkLocalOpen connectionId[{}], entityPath[{}], linkName[{}], localSource[{}]", + getConnectionId(), entityPath, link.getName(), link.getSource()); } } @@ -71,15 +73,15 @@ public void onLinkRemoteOpen(Event event) { } if (link.getRemoteSource() != null) { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteSource[{}]", - getConnectionId(), link.getName(), link.getRemoteSource()); + logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteSource[{}]", + getConnectionId(), entityPath, link.getName(), link.getRemoteSource()); if (isFirstResponse.getAndSet(false)) { onNext(EndpointState.ACTIVE); } } else { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], action[waitingForError]", - getConnectionId(), link.getName()); + logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], action[waitingForError]", + getConnectionId(), entityPath, link.getName()); } } @@ -102,9 +104,9 @@ public void onDelivery(Event event) { // before we fix proton-j - this work around ensures that we ignore the duplicate Delivery event if (delivery.isSettled()) { if (link != null) { - logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}]," - + " remoteCondition[{}], delivery.isSettled[{}]", - getConnectionId(), link.getName(), link.getCredit(), link.getRemoteCredit(), + logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], updatedLinkCredit[{}]," + + " remoteCredit[{}], remoteCondition[{}], delivery.isSettled[{}]", + getConnectionId(), entityPath, link.getName(), link.getCredit(), link.getRemoteCredit(), link.getRemoteCondition(), delivery.isSettled()); } else { logger.warning("connectionId[{}], delivery.isSettled[{}]", getConnectionId(), delivery.isSettled()); @@ -126,10 +128,10 @@ public void onDelivery(Event event) { } if (link != null) { - logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}]," - + " remoteCondition[{}], delivery.isPartial[{}]", - getConnectionId(), link.getName(), link.getCredit(), link.getRemoteCredit(), link.getRemoteCondition(), - delivery.isPartial()); + logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], updatedLinkCredit[{}]," + + "remoteCredit[{}], remoteCondition[{}], delivery.isPartial[{}]", + getConnectionId(), entityPath, link.getName(), link.getCredit(), link.getRemoteCredit(), + link.getRemoteCondition(), delivery.isPartial()); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java index 4bd2bc8e7003..25e1fd353519 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java @@ -19,6 +19,7 @@ public class SendLinkHandler extends LinkHandler { private final String linkName; + private final String entityPath; private final AtomicBoolean isFirstFlow = new AtomicBoolean(true); private final UnicastProcessor creditProcessor = UnicastProcessor.create(); private final DirectProcessor deliveryProcessor = DirectProcessor.create(); @@ -28,6 +29,7 @@ public class SendLinkHandler extends LinkHandler { public SendLinkHandler(String connectionId, String hostname, String linkName, String entityPath) { super(connectionId, hostname, entityPath, new ClientLogger(SendLinkHandler.class)); this.linkName = linkName; + this.entityPath = entityPath; } public String getLinkName() { @@ -53,8 +55,8 @@ public void close() { public void onLinkLocalOpen(Event event) { final Link link = event.getLink(); if (link instanceof Sender) { - logger.verbose("onLinkLocalOpen connectionId[{}], linkName[{}], localTarget[{}]", - getConnectionId(), link.getName(), link.getTarget()); + logger.verbose("onLinkLocalOpen connectionId[{}], entityPath[{}], linkName[{}], localTarget[{}]", + getConnectionId(), entityPath, link.getName(), link.getTarget()); } } @@ -66,16 +68,16 @@ public void onLinkRemoteOpen(Event event) { } if (link.getRemoteTarget() != null) { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[{}]", - getConnectionId(), link.getName(), link.getRemoteTarget()); + logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteTarget[{}]", + getConnectionId(), entityPath, link.getName(), link.getRemoteTarget()); if (isFirstFlow.getAndSet(false)) { onNext(EndpointState.ACTIVE); } } else { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[null], remoteSource[null], " - + "action[waitingForError]", - getConnectionId(), link.getName()); + logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteTarget[null]," + + " remoteSource[null], action[waitingForError]", + getConnectionId(), entityPath, link.getName()); } } @@ -88,8 +90,8 @@ public void onLinkFlow(Event event) { final Sender sender = event.getSender(); creditSink.next(sender.getRemoteCredit()); - logger.verbose("onLinkFlow connectionId[{}], linkName[{}], unsettled[{}], credit[{}]", - getConnectionId(), sender.getName(), sender.getUnsettled(), sender.getCredit()); + logger.verbose("onLinkFlow connectionId[{}], entityPath[{}], linkName[{}], unsettled[{}], credit[{}]", + getConnectionId(), entityPath, sender.getName(), sender.getUnsettled(), sender.getCredit()); } @Override @@ -99,9 +101,9 @@ public void onDelivery(Event event) { while (delivery != null) { Sender sender = (Sender) delivery.getLink(); - logger.verbose("onDelivery connectionId[{}], linkName[{}], unsettled[{}], credit[{}], deliveryState[{}], " - + "delivery.isBuffered[{}], delivery.id[{}]", - getConnectionId(), sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(), + logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], unsettled[{}], credit[{}]," + + " deliveryState[{}], delivery.isBuffered[{}], delivery.id[{}]", + getConnectionId(), entityPath, sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(), delivery.getRemoteState(), delivery.isBuffered(), new String(delivery.getTag(), StandardCharsets.UTF_8)); From 7c75c65f3c5cdc8b0364c701fbaac91b74647ca3 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 9 Sep 2020 19:40:10 -0700 Subject: [PATCH 18/21] Removing unneeded retries. --- .../amqp/implementation/ReactorConnection.java | 14 +++++--------- .../core/amqp/implementation/ReactorSender.java | 12 +++--------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 9584b9452f82..a07616f5bf9f 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -46,7 +46,6 @@ public class ReactorConnection implements AmqpConnection { private final ClientLogger logger = new ClientLogger(ReactorConnection.class); private final ConcurrentMap sessionMap = new ConcurrentHashMap<>(); - private final AtomicBoolean hasConnection = new AtomicBoolean(); private final AtomicBoolean isDisposed = new AtomicBoolean(); private final DirectProcessor shutdownSignals = DirectProcessor.create(); private final FluxSink shutdownSignalsSink = shutdownSignals.sink(); @@ -104,8 +103,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption this.senderSettleMode = senderSettleMode; this.receiverSettleMode = receiverSettleMode; - this.connectionMono = Mono.fromCallable(this::getOrCreateConnection) - .doOnSubscribe(c -> hasConnection.set(true)); + this.connectionMono = Mono.fromCallable(this::getOrCreateConnection); this.endpointStates = this.handler.getEndpointStates() .takeUntilOther(shutdownSignals) @@ -138,14 +136,12 @@ public Mono getClaimsBasedSecurityNode() { "connectionId[%s]: Connection is disposed. Cannot get CBS node.", connectionId)))); } - final Mono cbsNodeMono = RetryUtil.withRetry( - getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE), - connectionOptions.getRetry().getTryTimeout(), retryPolicy) + final Mono cbsNodeMono = + RetryUtil.withRetry(getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE), + connectionOptions.getRetry().getTryTimeout(), retryPolicy) .then(Mono.fromCallable(this::getOrCreateCBSNode)); - return hasConnection.get() - ? cbsNodeMono - : connectionMono.then(cbsNodeMono); + return connectionMono.then(cbsNodeMono); } @Override diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index 5b696ca957c5..d6c04c1523d5 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -323,15 +323,9 @@ public Mono send(byte[] bytes, int arrayOffset, int messageFormat } private Mono validateEndpoint() { - return Mono.defer(() -> { - if (hasConnected.get()) { - return Mono.empty(); - } else { - return RetryUtil.withRetry( - handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), timeout, retry) - .then(); - } - }); + return Mono.defer(() -> RetryUtil.withRetry( + handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), timeout, retry) + .then()); } /** From df0c5e6824a69799531b357332bb41292669db1d Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 10 Sep 2020 11:30:53 -0700 Subject: [PATCH 19/21] Remove empty tests. --- .../amqp/implementation/ReactorConnectionTest.java | 5 ----- .../implementation/handler/ReceiveLinkHandlerTest.java | 10 ---------- 2 files changed, 15 deletions(-) delete mode 100644 sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandlerTest.java diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java index 27486aabfeb5..e00b4c43d926 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java @@ -360,9 +360,4 @@ void cannotCreateResourcesOnFailure() { verify(transport, times(1)).unbind(); } - - @Test - void closesDownstreamLinks() { - - } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandlerTest.java deleted file mode 100644 index e33929742ccd..000000000000 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandlerTest.java +++ /dev/null @@ -1,10 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp.implementation.handler; - -/** - * Tests for {@link ReceiveLinkHandler}. - */ -class ReceiveLinkHandlerTest { -} From 7d6215d129ed04e461ac19d94f3d9ac9ed1db778 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 10 Sep 2020 11:32:23 -0700 Subject: [PATCH 20/21] Change methods to package-private. --- .../com/azure/core/amqp/implementation/ReactorConnection.java | 4 ++-- .../com/azure/core/amqp/implementation/ReactorReceiver.java | 2 +- .../com/azure/core/amqp/implementation/ReactorSender.java | 2 +- .../com/azure/core/amqp/implementation/ReactorSession.java | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index a07616f5bf9f..c60fc0aa9a66 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -253,7 +253,7 @@ public void dispose() { "Disposed by client.")); } - public void dispose(ErrorCondition errorCondition) { + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } @@ -408,7 +408,7 @@ public AmqpSession getSession() { return session; } - public void dispose(ErrorCondition errorCondition) { + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index c8c75c3e798c..673d6deea113 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -165,7 +165,7 @@ public void dispose() { * * @param condition Error condition associated with close operation. */ - public void dispose(ErrorCondition condition) { + void dispose(ErrorCondition condition) { if (isDisposed.getAndSet(true)) { return; } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index d6c04c1523d5..0d11b882ea4b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -291,7 +291,7 @@ public void dispose() { * * @param errorCondition Error condition associated with close operation. */ - public void dispose(ErrorCondition errorCondition) { + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index cedf359bdcb3..c05e70c5e549 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -130,7 +130,7 @@ public void dispose() { dispose(null); } - public void dispose(ErrorCondition errorCondition) { + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } @@ -577,7 +577,7 @@ public T getLink() { return link; } - public void dispose(ErrorCondition errorCondition) { + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } From 874edddbcc926cb89e51c2d6730c5e70f89ff8c8 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 10 Sep 2020 11:35:42 -0700 Subject: [PATCH 21/21] Add changelog entry --- sdk/core/azure-core-amqp/CHANGELOG.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 5b99af016d9d..ac29446ee8de 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,7 +1,9 @@ # Release History ## 1.5.0-beta.1 (Unreleased) -- Added Amqp Message envelope which can be accessed using `AmqpAnnotatedMessage`. +- Remove unused and duplicate logic for Handlers.getErrors(). +- Close children sessions and links when a connection is disposed. +- Added AMQP Message envelope which can be accessed using `AmqpAnnotatedMessage`. ## 1.4.0 (2020-08-11)