diff --git a/sdk/core/azure-core-amqp/CHANGELOG.md b/sdk/core/azure-core-amqp/CHANGELOG.md index 5b99af016d9d..ac29446ee8de 100644 --- a/sdk/core/azure-core-amqp/CHANGELOG.md +++ b/sdk/core/azure-core-amqp/CHANGELOG.md @@ -1,7 +1,9 @@ # Release History ## 1.5.0-beta.1 (Unreleased) -- Added Amqp Message envelope which can be accessed using `AmqpAnnotatedMessage`. +- Remove unused and duplicate logic for Handlers.getErrors(). +- Close children sessions and links when a connection is disposed. +- Added AMQP Message envelope which can be accessed using `AmqpAnnotatedMessage`. ## 1.4.0 (2020-08-11) diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java index 0b266ef518f4..db3daea8af6c 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpRetryOptions.java @@ -23,11 +23,11 @@ public class AmqpRetryOptions { * Creates an instance with the default retry options set. */ public AmqpRetryOptions() { - maxRetries = 3; - delay = Duration.ofMillis(800); - maxDelay = Duration.ofMinutes(1); - tryTimeout = Duration.ofMinutes(1); - retryMode = AmqpRetryMode.EXPONENTIAL; + this.maxRetries = 3; + this.delay = Duration.ofMillis(800); + this.maxDelay = Duration.ofMinutes(1); + this.tryTimeout = Duration.ofMinutes(1); + this.retryMode = AmqpRetryMode.EXPONENTIAL; } /** diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java index 8d67a2f86907..c60fc0aa9a66 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorConnection.java @@ -12,15 +12,16 @@ import com.azure.core.amqp.implementation.handler.ConnectionHandler; import com.azure.core.amqp.implementation.handler.SessionHandler; import com.azure.core.util.logging.ClientLogger; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.reactor.Reactor; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.publisher.DirectProcessor; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; @@ -35,6 +36,8 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; + public class ReactorConnection implements AmqpConnection { private static final String CBS_SESSION_NAME = "cbs-session"; private static final String CBS_ADDRESS = "$cbs"; @@ -43,12 +46,10 @@ public class ReactorConnection implements AmqpConnection { private final ClientLogger logger = new ClientLogger(ReactorConnection.class); private final ConcurrentMap sessionMap = new ConcurrentHashMap<>(); - private final AtomicBoolean hasConnection = new AtomicBoolean(); private final AtomicBoolean isDisposed = new AtomicBoolean(); private final DirectProcessor shutdownSignals = DirectProcessor.create(); - private final ReplayProcessor endpointStates = - ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private FluxSink endpointStatesSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final FluxSink shutdownSignalsSink = shutdownSignals.sink(); + private final ReplayProcessor endpointStates; private final String connectionId; private final Mono connectionMono; @@ -58,7 +59,6 @@ public class ReactorConnection implements AmqpConnection { private final MessageSerializer messageSerializer; private final ConnectionOptions connectionOptions; private final ReactorProvider reactorProvider; - private final Disposable.Composite subscriptions; private final AmqpRetryPolicy retryPolicy; private final SenderSettleMode senderSettleMode; private final ReceiverSettleMode receiverSettleMode; @@ -86,8 +86,8 @@ public class ReactorConnection implements AmqpConnection { */ public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, TokenManagerProvider tokenManagerProvider, - MessageSerializer messageSerializer, String product, String clientVersion, - SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) { + MessageSerializer messageSerializer, String product, String clientVersion, SenderSettleMode senderSettleMode, + ReceiverSettleMode receiverSettleMode) { this.connectionOptions = connectionOptions; this.reactorProvider = reactorProvider; @@ -103,26 +103,14 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption this.senderSettleMode = senderSettleMode; this.receiverSettleMode = receiverSettleMode; - this.connectionMono = Mono.fromCallable(this::getOrCreateConnection) - .doOnSubscribe(c -> hasConnection.set(true)); - - this.subscriptions = Disposables.composite( - this.handler.getEndpointStates().subscribe( - state -> { - logger.verbose("connectionId[{}]: Connection state: {}", connectionId, state); - endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state)); - }, error -> { - logger.error("connectionId[{}] Error occurred in connection endpoint.", connectionId, error); - endpointStatesSink.error(error); - }, () -> { - endpointStatesSink.next(AmqpEndpointState.CLOSED); - endpointStatesSink.complete(); - }), - - this.handler.getErrors().subscribe(error -> { - logger.error("connectionId[{}] Error occurred in connection handler.", connectionId, error); - endpointStatesSink.error(error); - })); + this.connectionMono = Mono.fromCallable(this::getOrCreateConnection); + + this.endpointStates = this.handler.getEndpointStates() + .takeUntilOther(shutdownSignals) + .map(state -> { + logger.verbose("connectionId[{}]: State {}", connectionId, state); + return AmqpEndpointStateUtil.getConnectionState(state); + }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); } /** @@ -148,14 +136,12 @@ public Mono getClaimsBasedSecurityNode() { "connectionId[%s]: Connection is disposed. Cannot get CBS node.", connectionId)))); } - final Mono cbsNodeMono = RetryUtil.withRetry( - getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE), - connectionOptions.getRetry().getTryTimeout(), retryPolicy) + final Mono cbsNodeMono = + RetryUtil.withRetry(getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE), + connectionOptions.getRetry().getTryTimeout(), retryPolicy) .then(Mono.fromCallable(this::getOrCreateCBSNode)); - return hasConnection.get() - ? cbsNodeMono - : connectionMono.then(cbsNodeMono); + return connectionMono.then(cbsNodeMono); } @Override @@ -249,17 +235,7 @@ protected AmqpSession createSession(String sessionName, Session session, Session */ @Override public boolean removeSession(String sessionName) { - if (sessionName == null) { - return false; - } - - final SessionSubscription removed = sessionMap.remove(sessionName); - - if (removed != null) { - removed.dispose(); - } - - return removed != null; + return removeSession(sessionName, null); } @Override @@ -272,18 +248,23 @@ public boolean isDisposed() { */ @Override public void dispose() { + dispose(null); + shutdownSignalsSink.next(new AmqpShutdownSignal(false, true, + "Disposed by client.")); + } + + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } - logger.info("connectionId[{}]: Disposing of ReactorConnection.", connectionId); - subscriptions.dispose(); - endpointStatesSink.complete(); + logger.info("connectionId[{}], errorCondition[{}]: Disposing of ReactorConnection.", connectionId, + errorCondition != null ? errorCondition : NOT_APPLICABLE); final String[] keys = sessionMap.keySet().toArray(new String[0]); for (String key : keys) { logger.info("connectionId[{}]: Removing session '{}'", connectionId, key); - removeSession(key); + removeSession(key, errorCondition); } if (connection != null) { @@ -331,6 +312,20 @@ protected Mono createRequestResponseChannel(String sessi new ClientLogger(String.format("%s<%s>", RequestResponseChannel.class, sessionName)))); } + private boolean removeSession(String sessionName, ErrorCondition errorCondition) { + if (sessionName == null) { + return false; + } + + final SessionSubscription removed = sessionMap.remove(sessionName); + + if (removed != null) { + removed.dispose(errorCondition); + } + + return removed != null; + } + private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() { if (cbsChannel == null) { logger.info("Setting CBS channel."); @@ -380,6 +375,7 @@ public void onConnectionError(Throwable exception) { getId(), getFullyQualifiedNamespace(), exception.getMessage()); endpointStates.onError(exception); + ReactorConnection.this.dispose(); } @Override @@ -393,16 +389,12 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) { "onReactorError connectionId[{}], hostName[{}], message[Shutting down], shutdown signal[{}]", getId(), getFullyQualifiedNamespace(), shutdownSignal.isInitiatedByClient(), shutdownSignal); - if (!endpointStatesSink.isCancelled()) { - endpointStatesSink.next(AmqpEndpointState.CLOSED); - endpointStatesSink.complete(); - } - - dispose(); + dispose(new ErrorCondition(Symbol.getSymbol("onReactorError"), shutdownSignal.toString())); + shutdownSignalsSink.next(shutdownSignal); } } - private static final class SessionSubscription implements Disposable { + private static final class SessionSubscription { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final AmqpSession session; private final Disposable subscription; @@ -412,22 +404,23 @@ private SessionSubscription(AmqpSession session, Disposable subscription) { this.subscription = subscription; } - public Disposable getSubscription() { - return subscription; - } - public AmqpSession getSession() { return session; } - @Override - public void dispose() { + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } + if (session instanceof ReactorSession) { + final ReactorSession reactorSession = (ReactorSession) session; + reactorSession.dispose(errorCondition); + } else { + session.dispose(); + } + subscription.dispose(); - session.dispose(); } } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java index 5e0fcd044034..673d6deea113 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorReceiver.java @@ -7,14 +7,14 @@ import com.azure.core.amqp.implementation.handler.ReceiveLinkHandler; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.publisher.EmitterProcessor; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.ReplayProcessor; import java.io.IOException; @@ -36,13 +36,11 @@ public class ReactorReceiver implements AmqpReceiveLink { private final ReceiveLinkHandler handler; private final TokenManager tokenManager; private final ReactorDispatcher dispatcher; - private final Disposable.Composite subscriptions; + private final Disposable subscriptions; private final AtomicBoolean isDisposed = new AtomicBoolean(); private final EmitterProcessor messagesProcessor; private final ClientLogger logger = new ClientLogger(ReactorReceiver.class); - private final ReplayProcessor endpointStates = - ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final ReplayProcessor endpointStates; private final AtomicReference> creditSupplier = new AtomicReference<>(); @@ -69,43 +67,28 @@ protected ReactorReceiver(String entityPath, Receiver receiver, ReceiveLinkHandl } }) .subscribeWith(EmitterProcessor.create()); - - this.subscriptions = Disposables.composite( - this.handler.getEndpointStates().subscribe( - state -> { - logger.verbose("Connection state: {}", state); - endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state)); - }, error -> { - logger.error("connectionId[{}] linkName[{}] entityPath[{}] Error occurred in connection.", - handler.getConnectionId(), receiver.getName(), entityPath, error); - endpointStateSink.error(error); - dispose(); - }, () -> { - endpointStateSink.next(AmqpEndpointState.CLOSED); - dispose(); - }), - - this.handler.getErrors().subscribe(error -> { - logger.error("connectionId[{}] linkName[{}] entityPath[{}] Error occurred in link.", - handler.getConnectionId(), receiver.getName(), entityPath, error); - endpointStateSink.error(error); - dispose(); - }), - - this.tokenManager.getAuthorizationResults().subscribe( - response -> { - logger.verbose("Token refreshed: {}", response); - hasAuthorized.set(true); - }, error -> { - logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", - handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage()); - hasAuthorized.set(false); - }, () -> hasAuthorized.set(false))); + this.endpointStates = this.handler.getEndpointStates() + .map(state -> { + logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", handler.getConnectionId(), + entityPath, getLinkName(), state); + return AmqpEndpointStateUtil.getConnectionState(state); + }) + .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); + + this.subscriptions = this.tokenManager.getAuthorizationResults().subscribe( + response -> { + logger.verbose("Token refreshed: {}", response); + hasAuthorized.set(true); + }, error -> { + logger.info("connectionId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", + handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage()); + hasAuthorized.set(false); + }, () -> hasAuthorized.set(false)); } @Override public Flux getEndpointStates() { - return endpointStates; + return endpointStates.distinct(); } @Override @@ -162,7 +145,6 @@ public void dispose() { } subscriptions.dispose(); - endpointStateSink.complete(); messagesProcessor.onComplete(); tokenManager.close(); receiver.close(); @@ -178,6 +160,41 @@ public void dispose() { } } + /** + * Disposes of the sender when an exception is encountered. + * + * @param condition Error condition associated with close operation. + */ + void dispose(ErrorCondition condition) { + if (isDisposed.getAndSet(true)) { + return; + } + + logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}", + handler.getConnectionId(), entityPath, getLinkName(), condition); + + if (receiver.getLocalState() != EndpointState.CLOSED) { + receiver.close(); + + if (receiver.getCondition() == null) { + receiver.setCondition(condition); + } + } + + try { + dispatcher.invoke(() -> { + receiver.free(); + handler.close(); + }); + } catch (IOException e) { + logger.warning("Could not schedule disposing of receiver on ReactorDispatcher.", e); + handler.close(); + } + + messagesProcessor.onComplete(); + tokenManager.close(); + } + protected Message decodeDelivery(Delivery delivery) { final int messageSize = delivery.pending(); final byte[] buffer = new byte[messageSize]; diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java index 066c0196626b..0d11b882ea4b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSender.java @@ -21,6 +21,7 @@ import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.transaction.Declared; import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Sender; @@ -29,7 +30,6 @@ import reactor.core.Disposable; import reactor.core.Disposables; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.azure.core.amqp.implementation.ClientConstants.MAX_AMQP_HEADER_SIZE_BYTES; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; import static com.azure.core.amqp.implementation.ClientConstants.SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS; import static java.nio.charset.StandardCharsets.UTF_8; @@ -74,9 +75,7 @@ class ReactorSender implements AmqpSendLink { private final PriorityQueue pendingSendsQueue = new PriorityQueue<>(1000, new DeliveryTagComparator()); private final ClientLogger logger = new ClientLogger(ReactorSender.class); - private final ReplayProcessor endpointStates = - ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final ReplayProcessor endpointStates; private final TokenManager tokenManager; private final MessageSerializer messageSerializer; @@ -101,43 +100,34 @@ class ReactorSender implements AmqpSendLink { this.retry = retry; this.timeout = timeout; + this.endpointStates = this.handler.getEndpointStates() + .map(state -> { + logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", handler.getConnectionId(), + entityPath, getLinkName(), state); + this.hasConnected.set(state == EndpointState.ACTIVE); + return AmqpEndpointStateUtil.getConnectionState(state); + }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); + this.subscriptions = Disposables.composite( this.handler.getDeliveredMessages().subscribe(this::processDeliveredMessage), this.handler.getLinkCredits().subscribe(credit -> { - logger.verbose("Credits on link: {}", credit); + logger.verbose("connectionId[{}], entityPath[{}], linkName[{}]: Credits on link: {}", + handler.getConnectionId(), entityPath, getLinkName(), credit); this.scheduleWorkOnDispatcher(); - }), - - this.handler.getEndpointStates().subscribe( - state -> { - logger.verbose("[{}] Connection state: {}", entityPath, state); - this.hasConnected.set(state == EndpointState.ACTIVE); - endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state)); - }, error -> { - logger.error("[{}] Error occurred in sender endpoint handler.", entityPath, error); - endpointStateSink.error(error); - }, () -> { - endpointStateSink.next(AmqpEndpointState.CLOSED); - endpointStateSink.complete(); - hasConnected.set(false); - }), - - this.handler.getErrors().subscribe(error -> { - logger.error("[{}] Error occurred in sender error handler.", entityPath, error); - endpointStateSink.error(error); }) ); if (tokenManager != null) { this.subscriptions.add(this.tokenManager.getAuthorizationResults().subscribe( response -> { - logger.verbose("Token refreshed: {}", response); + logger.verbose("connectionId[{}], entityPath[{}], linkName[{}]: Token refreshed: {}", + handler.getConnectionId(), entityPath, getLinkName(), response); hasAuthorized.set(true); }, error -> { - logger.info("clientId[{}], path[{}], linkName[{}] - tokenRenewalFailure[{}]", - handler.getConnectionId(), this.entityPath, getLinkName(), error.getMessage()); + logger.info("connectionId[{}], entityPath[{}], linkName[{}]: tokenRenewalFailure[{}]", + handler.getConnectionId(), entityPath, getLinkName(), error.getMessage()); hasAuthorized.set(false); }, () -> hasAuthorized.set(false))); } @@ -293,13 +283,35 @@ public boolean isDisposed() { @Override public void dispose() { + dispose(null); + } + + /** + * Disposes of the sender when an exception is encountered. + * + * @param errorCondition Error condition associated with close operation. + */ + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } subscriptions.dispose(); - endpointStateSink.complete(); tokenManager.close(); + + if (sender.getLocalState() == EndpointState.CLOSED) { + return; + } + + logger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}", + handler.getConnectionId(), entityPath, getLinkName(), + errorCondition != null ? errorCondition : NOT_APPLICABLE); + + if (errorCondition != null && sender.getCondition() == null) { + sender.setCondition(errorCondition); + } + + sender.close(); } @Override @@ -311,15 +323,9 @@ public Mono send(byte[] bytes, int arrayOffset, int messageFormat } private Mono validateEndpoint() { - return Mono.defer(() -> { - if (hasConnected.get()) { - return Mono.empty(); - } else { - return RetryUtil.withRetry( - handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), timeout, retry) - .then(); - } - }); + return Mono.defer(() -> RetryUtil.withRetry( + handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), timeout, retry) + .then()); } /** diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java index 6330477d472e..c05e70c5e549 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/ReactorSession.java @@ -17,16 +17,16 @@ import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.amqp.transaction.Coordinator; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.BaseHandler; +import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import reactor.core.Disposable; -import reactor.core.Disposables; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.ReplayProcessor; @@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; + /** * Represents an AMQP session using proton-j reactor. */ @@ -49,9 +51,7 @@ public class ReactorSession implements AmqpSession { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final ClientLogger logger = new ClientLogger(ReactorSession.class); - private final ReplayProcessor endpointStates = - ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED); - private FluxSink endpointStateSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER); + private final ReplayProcessor endpointStates; private final Session session; private final SessionHandler sessionHandler; @@ -61,14 +61,13 @@ public class ReactorSession implements AmqpSession { private final MessageSerializer messageSerializer; private final Duration openTimeout; - private final Disposable.Composite subscriptions; private final ReactorHandlerProvider handlerProvider; private final Mono cbsNodeSupplier; private final AtomicReference> coordinatorLink = new AtomicReference<>(); private final AtomicReference transactionCoordinator = new AtomicReference<>(); - private AmqpRetryPolicy retryPolicy; + private final AmqpRetryPolicy retryPolicy; /** * Creates a new AMQP session using proton-j. @@ -98,27 +97,13 @@ public ReactorSession(Session session, SessionHandler sessionHandler, String ses this.messageSerializer = messageSerializer; this.openTimeout = openTimeout; this.retryPolicy = retryPolicy; - - this.subscriptions = Disposables.composite( - this.sessionHandler.getEndpointStates().subscribe( - state -> { - logger.verbose("Connection state: {}", state); - endpointStateSink.next(AmqpEndpointStateUtil.getConnectionState(state)); - }, error -> { - logger.error("[{}] Error occurred in session endpoint handler.", sessionName, error); - endpointStateSink.error(error); - dispose(); - }, () -> { - endpointStateSink.next(AmqpEndpointState.CLOSED); - endpointStateSink.complete(); - dispose(); - }), - - this.sessionHandler.getErrors().subscribe(error -> { - logger.error("[{}] Error occurred in session error handler.", sessionName, error); - endpointStateSink.error(error); - dispose(); - })); + this.endpointStates = sessionHandler.getEndpointStates() + .map(state -> { + logger.verbose("connectionId[{}], sessionName[{}]: State ", sessionHandler.getConnectionId(), + sessionName, state); + return AmqpEndpointStateUtil.getConnectionState(state); + }) + .subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED)); session.open(); } @@ -142,20 +127,27 @@ public boolean isDisposed() { */ @Override public void dispose() { + dispose(null); + } + + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } - logger.info("sessionId[{}]: Disposing of session.", sessionName); + logger.info("connectionId[{}], sessionId[{}], errorCondition[{}]: Disposing of session.", + sessionHandler.getConnectionId(), sessionName, errorCondition != null ? errorCondition : NOT_APPLICABLE); - session.close(); - subscriptions.dispose(); + if (session.getLocalState() != EndpointState.CLOSED) { + session.close(); - openReceiveLinks.forEach((key, link) -> link.dispose()); - openReceiveLinks.clear(); + if (session.getCondition() == null) { + session.setCondition(errorCondition); + } + } - openSendLinks.forEach((key, link) -> link.dispose()); - openSendLinks.clear(); + openReceiveLinks.forEach((key, link) -> link.dispose(errorCondition)); + openSendLinks.forEach((key, link) -> link.dispose(errorCondition)); } /** @@ -174,7 +166,6 @@ public Duration getOperationTimeout() { return openTimeout; } - /** * {@inheritDoc} */ @@ -283,7 +274,7 @@ private Mono createCoordinatorSendLink(Duration timeout, AmqpRetry } else { logger.info("linkName[{}]: Another coordinator send link exists. Disposing of new one.", TRANSACTION_LINK_NAME); - linkSubscription.dispose(); + linkSubscription.dispose(null); } sink.success(coordinatorLink.get().getLink()); @@ -333,7 +324,7 @@ private boolean removeLink(ConcurrentMap removed = openLinks.remove(key); if (removed != null) { - removed.dispose(); + removed.dispose(null); } return removed != null; @@ -361,8 +352,8 @@ private boolean removeLink(ConcurrentMap createConsumer(String linkName, String entityPath, Duration timeout, - AmqpRetryPolicy retry, Map sourceFilters, - Map receiverProperties, Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, + AmqpRetryPolicy retry, Map sourceFilters, Map receiverProperties, + Symbol[] receiverDesiredCapabilities, SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) { if (isDisposed()) { @@ -445,7 +436,7 @@ protected Mono createProducer(String linkName, String entityPath, Dura return RetryUtil.withRetry( getEndpointStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), - timeout, retry).then(tokenManager.authorize()).then(Mono.create(sink -> { + timeout, retry).then(tokenManager.authorize()).then(Mono.create(sink -> { try { // We have to invoke this in the same thread or else proton-j will not properly link up the created // sender because the link names are not unique. Link name == entity path. @@ -572,7 +563,7 @@ private LinkSubscription getSubscription(String linkName, Strin return new LinkSubscription<>(reactorReceiver, subscription); } - private static final class LinkSubscription implements Disposable { + private static final class LinkSubscription { private final AtomicBoolean isDisposed = new AtomicBoolean(); private final T link; private final Disposable subscription; @@ -582,22 +573,26 @@ private LinkSubscription(T link, Disposable subscription) { this.subscription = subscription; } - public Disposable getSubscription() { - return subscription; - } - public T getLink() { return link; } - @Override - public void dispose() { + void dispose(ErrorCondition errorCondition) { if (isDisposed.getAndSet(true)) { return; } + if (link instanceof ReactorReceiver) { + final ReactorReceiver reactorReceiver = (ReactorReceiver) link; + reactorReceiver.dispose(errorCondition); + } else if (link instanceof ReactorSender) { + final ReactorSender reactorSender = (ReactorSender) link; + reactorSender.dispose(errorCondition); + } else { + link.dispose(); + } + subscription.dispose(); - link.dispose(); } } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java index 0bb755be2a84..d2f7be85eb82 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RequestResponseChannel.java @@ -138,12 +138,10 @@ protected RequestResponseChannel(String connectionId, String fullyQualifiedNames receiveLinkHandler.getEndpointStates().subscribe( state -> endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state)), this::handleError, this::dispose), - receiveLinkHandler.getErrors().subscribe(this::handleError), sendLinkHandler.getEndpointStates().subscribe(state -> endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state)), - this::handleError, this::dispose), - sendLinkHandler.getErrors().subscribe(this::handleError) + this::handleError, this::dispose) ); //@formatter:on diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java index 0858cd22bf44..4c73984d1f3a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ConnectionHandler.java @@ -152,7 +152,6 @@ public void onTransportError(Event event) { if (connection != null) { notifyErrorContext(connection, condition); - onNext(connection.getRemoteState()); } // onTransportError event is not handled by the global IO Handler for cleanup @@ -172,7 +171,6 @@ public void onTransportClosed(Event event) { if (connection != null) { notifyErrorContext(connection, condition); - onNext(connection.getRemoteState()); } } @@ -208,8 +206,6 @@ public void onConnectionLocalClose(Event event) { transport.unbind(); // we proactively dispose IO even if service fails to close } } - - onNext(connection.getRemoteState()); } @Override @@ -218,9 +214,11 @@ public void onConnectionRemoteClose(Event event) { final ErrorCondition error = connection.getRemoteCondition(); logErrorCondition("onConnectionRemoteClose", connection, error); - - onNext(connection.getRemoteState()); - notifyErrorContext(connection, error); + if (error == null) { + onNext(connection.getRemoteState()); + } else { + notifyErrorContext(connection, error); + } } @Override @@ -262,7 +260,7 @@ private void notifyErrorContext(Connection connection, ErrorCondition condition) final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(), condition.getDescription(), getErrorContext()); - onNext(exception); + onError(exception); } private void logErrorCondition(String eventName, Connection connection, ErrorCondition error) { diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java index d000bdb44717..1258d7698783 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/CustomIOHandler.java @@ -9,6 +9,8 @@ import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.reactor.impl.IOHandler; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; + public class CustomIOHandler extends IOHandler { private final ClientLogger logger = new ClientLogger(CustomIOHandler.class); private final String connectionId; @@ -23,7 +25,7 @@ public void onTransportClosed(Event event) { final Connection connection = event.getConnection(); logger.info("onTransportClosed connectionId[{}], hostname[{}]", - connectionId, (connection != null ? connection.getHostname() : "n/a")); + connectionId, (connection != null ? connection.getHostname() : NOT_APPLICABLE)); if (transport != null && connection != null && connection.getTransport() != null) { transport.unbind(); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java index 23293da62c2f..75b7ec0d330d 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/DispatchHandler.java @@ -23,8 +23,7 @@ public class DispatchHandler extends BaseHandler { * @param work The work to run on the {@link Reactor}. */ public DispatchHandler(Runnable work) { - Objects.requireNonNull(work); - this.work = work; + this.work = Objects.requireNonNull(work, "'work' cannot be null."); } /** @@ -32,7 +31,7 @@ public DispatchHandler(Runnable work) { */ @Override public void onTimerTask(Event e) { - logger.verbose("Running task for event: %s", e); + logger.verbose("Running task for event: {}", e); this.work.run(); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java index c68f3598a81f..4ddbde2218a9 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/Handler.java @@ -8,16 +8,15 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.ReplayProcessor; -import reactor.core.publisher.UnicastProcessor; import java.io.Closeable; +import java.util.concurrent.atomic.AtomicBoolean; public abstract class Handler extends BaseHandler implements Closeable { + private final AtomicBoolean isTerminal = new AtomicBoolean(); private final ReplayProcessor endpointStateProcessor = ReplayProcessor.cacheLastOrDefault(EndpointState.UNINITIALIZED); - private final UnicastProcessor errorContextProcessor = UnicastProcessor.create(); private final FluxSink endpointSink = endpointStateProcessor.sink(); - private final FluxSink errorSink = errorContextProcessor.sink(); private final String connectionId; private final String hostname; @@ -38,25 +37,26 @@ public Flux getEndpointStates() { return endpointStateProcessor.distinct(); } - public Flux getErrors() { - return errorContextProcessor; - } - void onNext(EndpointState state) { endpointSink.next(state); + } - if (state == EndpointState.CLOSED) { - endpointSink.complete(); + void onError(Throwable error) { + if (isTerminal.getAndSet(true)) { + return; } - } - void onNext(Throwable context) { - errorSink.next(context); + endpointSink.next(EndpointState.CLOSED); + endpointSink.error(error); } @Override public void close() { + if (isTerminal.getAndSet(true)) { + return; + } + + endpointSink.next(EndpointState.CLOSED); endpointSink.complete(); - errorSink.complete(); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java index e347bcc293bf..8ea445cb91ed 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/LinkHandler.java @@ -5,7 +5,6 @@ import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.LinkErrorContext; -import com.azure.core.amqp.implementation.ClientConstants; import com.azure.core.amqp.implementation.ExceptionUtil; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -14,11 +13,11 @@ import org.apache.qpid.proton.engine.Link; import static com.azure.core.amqp.implementation.AmqpErrorCode.TRACKING_ID_PROPERTY; +import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE; abstract class LinkHandler extends Handler { - private final String entityPath; - ClientLogger logger; + final ClientLogger logger; LinkHandler(String connectionId, String hostname, String entityPath, ClientLogger logger) { super(connectionId, hostname); @@ -32,9 +31,10 @@ public void onLinkLocalClose(Event event) { final ErrorCondition condition = link.getCondition(); logger.info("onLinkLocalClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", - getConnectionId(), link.getName(), - condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE, - condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE); + getConnectionId(), + link.getName(), + condition != null ? condition.getCondition() : NOT_APPLICABLE, + condition != null ? condition.getDescription() : NOT_APPLICABLE); } @Override @@ -44,8 +44,8 @@ public void onLinkRemoteClose(Event event) { logger.info("onLinkRemoteClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", getConnectionId(), link.getName(), - condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE, - condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE); + condition != null ? condition.getCondition() : NOT_APPLICABLE, + condition != null ? condition.getDescription() : NOT_APPLICABLE); handleRemoteLinkClosed(event); } @@ -57,15 +57,18 @@ public void onLinkRemoteDetach(Event event) { logger.info("onLinkRemoteClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", getConnectionId(), link.getName(), - condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE, - condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE); + condition != null ? condition.getCondition() : NOT_APPLICABLE, + condition != null ? condition.getDescription() : NOT_APPLICABLE); handleRemoteLinkClosed(event); } @Override public void onLinkFinal(Event event) { - logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), event.getLink().getName()); + final String linkName = event != null && event.getLink() != null + ? event.getLink().getName() + : NOT_APPLICABLE; + logger.info("onLinkFinal connectionId[{}], linkName[{}]", getConnectionId(), linkName); close(); } @@ -80,22 +83,6 @@ public AmqpErrorContext getErrorContext(Link link) { return new LinkErrorContext(getHostname(), entityPath, referenceId, link.getCredit()); } - private void processOnClose(Link link, ErrorCondition condition) { - logger.info("processOnClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", - getConnectionId(), link.getName(), - condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE, - condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE); - - if (condition != null && condition.getCondition() != null) { - final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(), - condition.getDescription(), getErrorContext(link)); - - onNext(exception); - } - - onNext(EndpointState.CLOSED); - } - private void handleRemoteLinkClosed(final Event event) { final Link link = event.getLink(); final ErrorCondition condition = link.getRemoteCondition(); @@ -105,6 +92,18 @@ private void handleRemoteLinkClosed(final Event event) { link.close(); } - processOnClose(link, condition); + logger.info("processOnClose connectionId[{}], linkName[{}], errorCondition[{}], errorDescription[{}]", + getConnectionId(), link.getName(), + condition != null ? condition.getCondition() : NOT_APPLICABLE, + condition != null ? condition.getDescription() : NOT_APPLICABLE); + + if (condition != null && condition.getCondition() != null) { + final Throwable exception = ExceptionUtil.toException(condition.getCondition().toString(), + condition.getDescription(), getErrorContext(link)); + + onError(exception); + } else { + close(); + } } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java index 0017fed355f9..f52756a50d9a 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/ReceiveLinkHandler.java @@ -4,9 +4,6 @@ package com.azure.core.amqp.implementation.handler; import com.azure.core.util.logging.ClientLogger; -import java.util.Collections; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; @@ -17,20 +14,25 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; public class ReceiveLinkHandler extends LinkHandler { private final String linkName; - private AtomicBoolean isFirstResponse = new AtomicBoolean(true); + private final AtomicBoolean isFirstResponse = new AtomicBoolean(true); private final DirectProcessor deliveries; - private FluxSink deliverySink; - private Set queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final FluxSink deliverySink; + private final Set queuedDeliveries = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final String entityPath; public ReceiveLinkHandler(String connectionId, String hostname, String linkName, String entityPath) { super(connectionId, hostname, entityPath, new ClientLogger(ReceiveLinkHandler.class)); this.deliveries = DirectProcessor.create(); this.deliverySink = deliveries.sink(FluxSink.OverflowStrategy.BUFFER); this.linkName = linkName; + this.entityPath = entityPath; } public String getLinkName() { @@ -38,11 +40,9 @@ public String getLinkName() { } public Flux getDeliveredMessages() { - return deliveries - .doOnNext(this::removeQueuedDelivery); + return deliveries.doOnNext(delivery -> queuedDeliveries.remove(delivery)); } - @Override public void close() { deliverySink.complete(); @@ -60,26 +60,28 @@ public void close() { public void onLinkLocalOpen(Event event) { final Link link = event.getLink(); if (link instanceof Receiver) { - logger.info("onLinkLocalOpen connectionId[{}], linkName[{}], localSource[{}]", - getConnectionId(), link.getName(), link.getSource()); + logger.info("onLinkLocalOpen connectionId[{}], entityPath[{}], linkName[{}], localSource[{}]", + getConnectionId(), entityPath, link.getName(), link.getSource()); } } @Override public void onLinkRemoteOpen(Event event) { final Link link = event.getLink(); - if (link instanceof Receiver) { - if (link.getRemoteSource() != null) { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteSource[{}]", - getConnectionId(), link.getName(), link.getRemoteSource()); + if (!(link instanceof Receiver)) { + return; + } - if (isFirstResponse.getAndSet(false)) { - onNext(EndpointState.ACTIVE); - } - } else { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], action[waitingForError]", - getConnectionId(), link.getName()); + if (link.getRemoteSource() != null) { + logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteSource[{}]", + getConnectionId(), entityPath, link.getName(), link.getRemoteSource()); + + if (isFirstResponse.getAndSet(false)) { + onNext(EndpointState.ACTIVE); } + } else { + logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], action[waitingForError]", + getConnectionId(), entityPath, link.getName()); } } @@ -102,9 +104,9 @@ public void onDelivery(Event event) { // before we fix proton-j - this work around ensures that we ignore the duplicate Delivery event if (delivery.isSettled()) { if (link != null) { - logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}]," - + " remoteCondition[{}], delivery.isSettled[{}]", - getConnectionId(), link.getName(), link.getCredit(), link.getRemoteCredit(), + logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], updatedLinkCredit[{}]," + + " remoteCredit[{}], remoteCondition[{}], delivery.isSettled[{}]", + getConnectionId(), entityPath, link.getName(), link.getCredit(), link.getRemoteCredit(), link.getRemoteCondition(), delivery.isSettled()); } else { logger.warning("connectionId[{}], delivery.isSettled[{}]", getConnectionId(), delivery.isSettled()); @@ -126,20 +128,16 @@ public void onDelivery(Event event) { } if (link != null) { - logger.verbose("onDelivery connectionId[{}], linkName[{}], updatedLinkCredit[{}], remoteCredit[{}]," - + " remoteCondition[{}], delivery.isPartial[{}]", - getConnectionId(), link.getName(), link.getCredit(), link.getRemoteCredit(), link.getRemoteCondition(), - delivery.isPartial()); + logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], updatedLinkCredit[{}]," + + "remoteCredit[{}], remoteCondition[{}], delivery.isPartial[{}]", + getConnectionId(), entityPath, link.getName(), link.getCredit(), link.getRemoteCredit(), + link.getRemoteCondition(), delivery.isPartial()); } } @Override public void onLinkRemoteClose(Event event) { - super.onLinkRemoteClose(event); deliverySink.complete(); - } - - private void removeQueuedDelivery(Delivery delivery) { - queuedDeliveries.remove(delivery); + super.onLinkRemoteClose(event); } } diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java index 1d92def80e0f..25e1fd353519 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SendLinkHandler.java @@ -18,16 +18,22 @@ import java.util.concurrent.atomic.AtomicBoolean; public class SendLinkHandler extends LinkHandler { - private final String senderName; + private final String linkName; + private final String entityPath; private final AtomicBoolean isFirstFlow = new AtomicBoolean(true); private final UnicastProcessor creditProcessor = UnicastProcessor.create(); private final DirectProcessor deliveryProcessor = DirectProcessor.create(); private final FluxSink creditSink = creditProcessor.sink(); private final FluxSink deliverySink = deliveryProcessor.sink(); - public SendLinkHandler(String connectionId, String hostname, String senderName, String entityPath) { + public SendLinkHandler(String connectionId, String hostname, String linkName, String entityPath) { super(connectionId, hostname, entityPath, new ClientLogger(SendLinkHandler.class)); - this.senderName = senderName; + this.linkName = linkName; + this.entityPath = entityPath; + } + + public String getLinkName() { + return linkName; } public Flux getLinkCredits() { @@ -49,27 +55,29 @@ public void close() { public void onLinkLocalOpen(Event event) { final Link link = event.getLink(); if (link instanceof Sender) { - logger.verbose("onLinkLocalOpen connectionId[{}], linkName[{}], localTarget[{}]", - getConnectionId(), link.getName(), link.getTarget()); + logger.verbose("onLinkLocalOpen connectionId[{}], entityPath[{}], linkName[{}], localTarget[{}]", + getConnectionId(), entityPath, link.getName(), link.getTarget()); } } @Override public void onLinkRemoteOpen(Event event) { final Link link = event.getLink(); - if (link instanceof Sender) { - if (link.getRemoteTarget() != null) { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[{}]", - getConnectionId(), link.getName(), link.getRemoteTarget()); - - if (isFirstFlow.getAndSet(false)) { - onNext(EndpointState.ACTIVE); - } - } else { - logger.info("onLinkRemoteOpen connectionId[{}], linkName[{}], remoteTarget[null], remoteSource[null], " - + "action[waitingForError]", - getConnectionId(), link.getName()); + if (!(link instanceof Sender)) { + return; + } + + if (link.getRemoteTarget() != null) { + logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteTarget[{}]", + getConnectionId(), entityPath, link.getName(), link.getRemoteTarget()); + + if (isFirstFlow.getAndSet(false)) { + onNext(EndpointState.ACTIVE); } + } else { + logger.info("onLinkRemoteOpen connectionId[{}], entityPath[{}], linkName[{}], remoteTarget[null]," + + " remoteSource[null], action[waitingForError]", + getConnectionId(), entityPath, link.getName()); } } @@ -82,8 +90,8 @@ public void onLinkFlow(Event event) { final Sender sender = event.getSender(); creditSink.next(sender.getRemoteCredit()); - logger.verbose("onLinkFlow connectionId[{}], linkName[{}], unsettled[{}], credit[{}]", - getConnectionId(), sender.getName(), sender.getUnsettled(), sender.getCredit()); + logger.verbose("onLinkFlow connectionId[{}], entityPath[{}], linkName[{}], unsettled[{}], credit[{}]", + getConnectionId(), entityPath, sender.getName(), sender.getUnsettled(), sender.getCredit()); } @Override @@ -93,9 +101,9 @@ public void onDelivery(Event event) { while (delivery != null) { Sender sender = (Sender) delivery.getLink(); - logger.verbose("onDelivery connectionId[{}], linkName[{}], unsettled[{}], credit[{}], deliveryState[{}], " - + "delivery.isBuffered[{}], delivery.id[{}]", - getConnectionId(), sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(), + logger.verbose("onDelivery connectionId[{}], entityPath[{}], linkName[{}], unsettled[{}], credit[{}]," + + " deliveryState[{}], delivery.isBuffered[{}], delivery.id[{}]", + getConnectionId(), entityPath, sender.getName(), sender.getUnsettled(), sender.getRemoteCredit(), delivery.getRemoteState(), delivery.isBuffered(), new String(delivery.getTag(), StandardCharsets.UTF_8)); diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java index 4c9b47d2beb6..0fb9aee7786b 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/handler/SessionHandler.java @@ -63,20 +63,23 @@ public void onSessionLocalOpen(Event e) { getConnectionId(), this.entityName, ioException.getMessage()); final Throwable exception = new AmqpException(false, message, ioException, getErrorContext()); - onNext(exception); + onError(exception); } } @Override public void onSessionRemoteOpen(Event e) { final Session session = e.getSession(); - - logger.info( - "onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}], sessionOutgoingWindow[{}]", - getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow()); - if (session.getLocalState() == EndpointState.UNINITIALIZED) { + logger.warning("onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}]," + + " sessionOutgoingWindow[{}] endpoint was uninitialised.", + getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow()); + session.open(); + } else { + logger.info("onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}]," + + " sessionOutgoingWindow[{}]", + getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow()); } onNext(EndpointState.ACTIVE); @@ -117,26 +120,25 @@ public void onSessionRemoteClose(Event e) { session.close(); } - onNext(EndpointState.CLOSED); - - if (condition != null) { + if (condition == null) { + onNext(EndpointState.CLOSED); + } else { final String id = getConnectionId(); final AmqpErrorContext context = getErrorContext(); final Exception exception; if (condition.getCondition() == null) { - exception = new AmqpException(false, - String.format(Locale.US, + exception = new AmqpException(false, String.format(Locale.US, "onSessionRemoteClose connectionId[%s], entityName[%s], condition[%s]", id, entityName, condition), context); } else { - exception = ExceptionUtil.toException(condition.getCondition().toString(), - String.format(Locale.US, "onSessionRemoteClose connectionId[%s], entityName[%s]", id, + exception = ExceptionUtil.toException(condition.getCondition().toString(), String.format(Locale.US, + "onSessionRemoteClose connectionId[%s], entityName[%s]", id, entityName), context); } - onNext(exception); + onError(exception); } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java index 5c670c80d183..e00b4c43d926 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorConnectionTest.java @@ -248,9 +248,7 @@ void initialConnectionState() { // Assert StepVerifier.create(connection.getEndpointStates()) .expectNext(AmqpEndpointState.UNINITIALIZED) - .then(() -> { - connection.dispose(); - }) + .then(() -> connection.dispose()) .verifyComplete(); } @@ -362,9 +360,4 @@ void cannotCreateResourcesOnFailure() { verify(transport, times(1)).unbind(); } - - @Test - void cannotCreateSessionWhenDisposed() { - - } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java index cfbc5b53546c..b60f00bf991e 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorReceiverTest.java @@ -144,7 +144,6 @@ void updateEndpointState() { .expectNext(AmqpEndpointState.ACTIVE) .then(() -> receiverHandler.close()) .expectNext(AmqpEndpointState.CLOSED) - .then(() -> reactorReceiver.dispose()) .verifyComplete(); } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java index 356bd1d5e0e9..c26b73e7c5e5 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSenderTest.java @@ -3,28 +3,11 @@ package com.azure.core.amqp.implementation; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.doNothing; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.ExponentialAmqpRetryPolicy; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.implementation.handler.SendLinkHandler; -import java.io.IOException; -import java.time.Duration; -import java.util.Arrays; -import java.util.List; - import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -52,6 +35,23 @@ import reactor.core.publisher.ReplayProcessor; import reactor.test.StepVerifier; +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.List; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + /** * Unit tests for {@link ReactorSender} */ @@ -98,7 +98,6 @@ public void setup() throws IOException { FluxSink sink1 = endpointStateReplayProcessor.sink(); sink1.next(EndpointState.ACTIVE); - when(handler.getErrors()).thenReturn(Flux.empty()); when(tokenManager.getAuthorizationResults()).thenReturn(Flux.just(AmqpResponseCode.ACCEPTED)); when(sender.getCredit()).thenReturn(100); when(sender.advance()).thenReturn(true); diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java index e88388d2e44d..061d2fa25c35 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/ReactorSessionTest.java @@ -4,15 +4,23 @@ package com.azure.core.amqp.implementation; import com.azure.core.amqp.AmqpEndpointState; +import com.azure.core.amqp.AmqpLink; import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.AmqpRetryPolicy; import com.azure.core.amqp.ClaimsBasedSecurityNode; +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpResponseCode; +import com.azure.core.amqp.implementation.handler.SendLinkHandler; import com.azure.core.amqp.implementation.handler.SessionHandler; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Record; +import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.reactor.Reactor; -import org.apache.qpid.proton.reactor.Selectable; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -20,12 +28,20 @@ import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.io.IOException; import java.time.Duration; - +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -39,52 +55,61 @@ public class ReactorSessionTest { private SessionHandler handler; private ReactorSession reactorSession; - private AmqpRetryPolicy retryPolicy; @Mock private Session session; @Mock private Reactor reactor; @Mock - private Selectable selectable; - @Mock private Event event; @Mock + private Receiver receiver; + @Mock + private Sender sender; + @Mock + private Record record; + @Mock private ClaimsBasedSecurityNode cbsNode; @Mock private MessageSerializer serializer; @Mock private ReactorProvider reactorProvider; + @Mock + private ReactorHandlerProvider reactorHandlerProvider; + @Mock + private ReactorDispatcher reactorDispatcher; + @Mock + private TokenManagerProvider tokenManagerProvider; + + private Mono cbsNodeSupplier; @BeforeEach public void setup() throws IOException { MockitoAnnotations.initMocks(this); - when(reactor.selectable()).thenReturn(selectable); - when(event.getSession()).thenReturn(session); - ReactorDispatcher dispatcher = new ReactorDispatcher(reactor); - this.handler = new SessionHandler(ID, HOST, ENTITY_PATH, dispatcher, Duration.ofSeconds(60)); + this.handler = new SessionHandler(ID, HOST, ENTITY_PATH, reactorDispatcher, Duration.ofSeconds(60)); + this.cbsNodeSupplier = Mono.just(cbsNode); when(reactorProvider.getReactor()).thenReturn(reactor); - when(reactorProvider.getReactorDispatcher()).thenReturn(dispatcher); - - MockReactorHandlerProvider handlerProvider = new MockReactorHandlerProvider(reactorProvider, null, handler, null, null); - AzureTokenManagerProvider azureTokenManagerProvider = new AzureTokenManagerProvider( - CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, HOST, "a-test-scope"); - this.retryPolicy = RetryUtil.getRetryPolicy(new AmqpRetryOptions()); - this.reactorSession = new ReactorSession(session, handler, NAME, reactorProvider, handlerProvider, - Mono.just(cbsNode), azureTokenManagerProvider, serializer, TIMEOUT, retryPolicy); + when(reactorProvider.getReactorDispatcher()).thenReturn(reactorDispatcher); + when(event.getSession()).thenReturn(session); + when(sender.attachments()).thenReturn(record); + when(receiver.attachments()).thenReturn(record); + + doAnswer(invocation -> { + final Runnable runnable = invocation.getArgument(0); + runnable.run(); + return null; + }).when(reactorDispatcher).invoke(any()); + + AmqpRetryPolicy retryPolicy = RetryUtil.getRetryPolicy(new AmqpRetryOptions()); + this.reactorSession = new ReactorSession(session, handler, NAME, reactorProvider, reactorHandlerProvider, + cbsNodeSupplier, tokenManagerProvider, serializer, TIMEOUT, retryPolicy); } @AfterEach public void teardown() { - session = null; - reactor = null; - selectable = null; - event = null; - cbsNode = null; - Mockito.framework().clearInlineMocks(); } @@ -108,7 +133,6 @@ public void verifyEndpointStates() { .expectNext(AmqpEndpointState.ACTIVE) .then(() -> handler.close()) .expectNext(AmqpEndpointState.CLOSED) - .then(() -> reactorSession.dispose()) .expectComplete() .verify(Duration.ofSeconds(10)); } @@ -116,6 +140,92 @@ public void verifyEndpointStates() { @Test public void verifyDispose() { reactorSession.dispose(); - Assertions.assertTrue(reactorSession.isDisposed()); + assertTrue(reactorSession.isDisposed()); + } + + /** + * Verifies that we can create the producer. + */ + @Test + void createProducer() { + // Arrange + final String linkName = "test-link-name"; + final String entityPath = "test-entity-path"; + final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class); + final Map linkProperties = new HashMap<>(); + final Duration timeout = Duration.ofSeconds(30); + final TokenManager tokenManager = mock(TokenManager.class); + final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath); + + when(session.sender(linkName)).thenReturn(sender); + when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager); + when(tokenManager.authorize()).thenReturn(Mono.just(1000L)); + when(tokenManager.getAuthorizationResults()) + .thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.ACCEPTED))); + when(reactorHandlerProvider.createSendLinkHandler(ID, HOST, linkName, entityPath)) + .thenReturn(sendLinkHandler); + + StepVerifier.create( + reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy, linkProperties)) + .then(() -> handler.onSessionRemoteOpen(event)) + .thenAwait(Duration.ofSeconds(2)) + .assertNext(producer -> assertTrue(producer instanceof ReactorSender)) + .verifyComplete(); + + final AmqpLink sendLink = reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy, + linkProperties) + .block(TIMEOUT); + + assertNotNull(sendLink); + } + + /** + * Verifies that we can create the producer. + */ + @Test + void createProducerAgainAfterException() { + // Arrange + final String linkName = "test-link-name"; + final String entityPath = "test-entity-path"; + final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class); + final Map linkProperties = new HashMap<>(); + final Duration timeout = Duration.ofSeconds(30); + final TokenManager tokenManager = mock(TokenManager.class); + final SendLinkHandler sendLinkHandler = new SendLinkHandler(ID, HOST, linkName, entityPath); + + final Event closeSendEvent = mock(Event.class); + when(closeSendEvent.getLink()).thenReturn(sender); + + final ErrorCondition errorCondition = new ErrorCondition( + Symbol.valueOf(AmqpErrorCondition.SERVER_BUSY_ERROR.getErrorCondition()), "test-busy"); + when(sender.getRemoteCondition()).thenReturn(errorCondition); + + when(session.sender(linkName)).thenReturn(sender); + when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManager); + when(tokenManager.authorize()).thenReturn(Mono.just(1000L)); + when(tokenManager.getAuthorizationResults()) + .thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.ACCEPTED))); + when(reactorHandlerProvider.createSendLinkHandler(ID, HOST, linkName, entityPath)) + .thenReturn(sendLinkHandler); + + handler.onSessionRemoteOpen(event); + + final AmqpLink sendLink = reactorSession.createProducer(linkName, entityPath, timeout, amqpRetryPolicy, + linkProperties) + .block(TIMEOUT); + + assertNotNull(sendLink); + assertTrue(sendLink instanceof AmqpSendLink); + + // Act + sendLinkHandler.onLinkRemoteClose(closeSendEvent); + } + + @Test + void createConsumer() { + // Arrange + final String linkName = "test-link-name"; + final String entityPath = "test-entity-path"; + final AmqpRetryPolicy amqpRetryPolicy = mock(AmqpRetryPolicy.class); } } diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java index 80cfc91f2985..665bbeada14d 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RequestResponseChannelTest.java @@ -31,7 +31,6 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; import reactor.core.publisher.DirectProcessor; -import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.ReplayProcessor; import reactor.test.StepVerifier; @@ -121,11 +120,9 @@ void beforeEach() { FluxSink sink1 = endpointStateReplayProcessor.sink(); sink1.next(EndpointState.ACTIVE); when(receiveLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); - when(receiveLinkHandler.getErrors()).thenReturn(Flux.never()); when(receiveLinkHandler.getDeliveredMessages()).thenReturn(deliveryProcessor); when(sendLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); - when(sendLinkHandler.getErrors()).thenReturn(Flux.never()); } @AfterEach diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java index 78895e08989a..c0a73a451ae2 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/HandlerTest.java @@ -24,14 +24,7 @@ public void initialHandlerState() { StepVerifier.create(handler.getEndpointStates()) .expectNext(EndpointState.UNINITIALIZED) .then(handler::close) - .verifyComplete(); - } - - @Test - public void initialErrors() { - // Act & Assert - StepVerifier.create(handler.getErrors()) - .then(handler::close) + .expectNext(EndpointState.CLOSED) .verifyComplete(); } @@ -44,6 +37,7 @@ public void propagatesStates() { .expectNext(EndpointState.ACTIVE) .then(() -> handler.onNext(EndpointState.ACTIVE)) .then(handler::close) + .expectNext(EndpointState.CLOSED) .verifyComplete(); } @@ -54,11 +48,55 @@ public void propagatesErrors() { final Throwable exception = new AmqpException(false, "Some test message.", context); // Act & Assert - StepVerifier.create(handler.getErrors()) - .then(() -> handler.onNext(exception)) - .expectNext(exception) - .then(handler::close) - .verifyComplete(); + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onError(exception)) + .expectNext(EndpointState.CLOSED) + .expectErrorMatches(e -> e.equals(exception)) + .verify(); + } + + @Test + public void propagatesErrorsOnce() { + // Arrange + final AmqpErrorContext context = new AmqpErrorContext("test namespace."); + final Throwable exception = new AmqpException(false, "Some test message.", context); + final Throwable exception2 = new AmqpException(false, "Some test message2.", context); + + // Act & Assert + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> { + handler.onError(exception); + handler.onError(exception2); + }) + .expectNext(EndpointState.CLOSED) + .expectErrorMatches(e -> e.equals(exception)) + .verify(); + + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.CLOSED) + .expectErrorMatches(e -> e.equals(exception)) + .verify(); + } + + @Test + public void completesOnce() { + // Act & Assert + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onNext(EndpointState.ACTIVE)) + .expectNext(EndpointState.ACTIVE) + .then(() -> handler.close()) + .expectNext(EndpointState.CLOSED) + .expectComplete() + .verify(); + + // The last state is always replayed before it is closed. + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.CLOSED) + .expectComplete() + .verify(); } private static class TestHandler extends Handler { diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java index f1ff19e8f33c..d0060c447f36 100644 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/handler/LinkHandlerTest.java @@ -28,6 +28,7 @@ import static com.azure.core.amqp.exception.AmqpErrorCondition.LINK_STOLEN; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.same; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyZeroInteractions; @@ -51,7 +52,6 @@ class LinkHandlerTest { private final String description = "test-description"; private final LinkHandler handler = new MockLinkHandler(CONNECTION_ID, HOSTNAME, ENTITY_PATH, logger); - @BeforeAll static void beforeAll() { StepVerifier.setDefaultTimeout(Duration.ofSeconds(30)); @@ -147,18 +147,17 @@ void onLinkRemoteClose() { when(session.getLocalState()).thenReturn(EndpointState.ACTIVE); // Act - StepVerifier.Step endpointState = StepVerifier.create(handler.getEndpointStates()) - .expectNext(EndpointState.CLOSED); - - StepVerifier.Step throwableStep = StepVerifier.create(handler.getErrors()) - .assertNext(error -> { + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onLinkRemoteClose(event)) + .expectNext(EndpointState.CLOSED) + .expectErrorSatisfies(error -> { Assertions.assertTrue(error instanceof AmqpException); AmqpException exception = (AmqpException) error; Assertions.assertEquals(LINK_STOLEN, exception.getErrorCondition()); - }); - - handler.onLinkRemoteClose(event); + }) + .verify(); // Assert verify(link).setCondition(errorCondition); @@ -166,17 +165,13 @@ void onLinkRemoteClose() { verify(session, never()).setCondition(errorCondition); verify(session, never()).close(); - - endpointState.thenCancel().verify(); - throwableStep.then(() -> handler.close()) - .verifyComplete(); } /** - * Verifies that it does not close the link when the link is already in a closed endpoint state. + * Verifies that an error is propagated if there is an error condition on close. */ @Test - void onLinkRemoteCloseNoException() { + void onLinkRemoteCloseWithErrorCondition() { // Arrange final ErrorCondition errorCondition = new ErrorCondition(symbol, description); @@ -185,19 +180,17 @@ void onLinkRemoteCloseNoException() { when(link.getLocalState()).thenReturn(EndpointState.CLOSED); // Act & Assert - StepVerifier.Step endpointState = StepVerifier.create(handler.getEndpointStates()) + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onLinkRemoteClose(event)) .expectNext(EndpointState.CLOSED) - .expectNoEvent(Duration.ofSeconds(2)); - - StepVerifier.Step throwableStep = StepVerifier.create(handler.getErrors()) - .assertNext(error -> { + .expectErrorSatisfies(error -> { Assertions.assertTrue(error instanceof AmqpException); AmqpException exception = (AmqpException) error; Assertions.assertEquals(LINK_STOLEN, exception.getErrorCondition()); - }); - - handler.onLinkRemoteClose(event); + }) + .verify(); // Assert verify(link, never()).setCondition(errorCondition); @@ -205,11 +198,39 @@ void onLinkRemoteCloseNoException() { verify(session, never()).setCondition(errorCondition); verify(session, never()).close(); + } - endpointState.thenCancel().verify(); - throwableStep.thenCancel().verify(); + /** + * Verifies that no error is propagated. And it is closed instead. + */ + @Test + void onLinkRemoteCloseNoErrorCondition() { + // Arrange + final ErrorCondition errorCondition = new ErrorCondition(null, description); + final Event finalEvent = mock(Event.class); + + when(link.getRemoteCondition()).thenReturn(errorCondition); + when(link.getSession()).thenReturn(session); + when(link.getLocalState()).thenReturn(EndpointState.CLOSED); + + // Act & Assert + StepVerifier.create(handler.getEndpointStates()) + .expectNext(EndpointState.UNINITIALIZED) + .then(() -> handler.onLinkRemoteClose(event)) + .expectNext(EndpointState.CLOSED) + .then(() -> handler.onLinkFinal(finalEvent)) + .expectComplete() + .verify(); + + // Assert + verify(link, never()).setCondition(errorCondition); + verify(link, never()).close(); + + verify(session, never()).setCondition(errorCondition); + verify(session, never()).close(); } + private static final class MockLinkHandler extends LinkHandler { MockLinkHandler(String connectionId, String hostname, String entityPath, ClientLogger logger) { super(connectionId, hostname, entityPath, logger); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index d20613d9a5a0..604961035346 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -667,5 +667,4 @@ private ProxyOptions getProxyOptions(ProxyAuthenticationType authentication, Str coreProxyOptions.getAddress()), coreProxyOptions.getUsername(), coreProxyOptions.getPassword()); } } - } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index aa41cdd1b7ce..c673fd322176 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -127,7 +127,7 @@ void stopAllPartitionPumps() { */ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoint) { if (partitionPumps.containsKey(claimedOwnership.getPartitionId())) { - logger.verbose("Consumer is already running for this partition {}", claimedOwnership.getPartitionId()); + logger.verbose("Consumer is already running for this partition {}", claimedOwnership.getPartitionId()); return; } diff --git a/sdk/eventhubs/microsoft-azure-eventhubs/README.md b/sdk/eventhubs/microsoft-azure-eventhubs/README.md index 95b9c61dbed2..1df6ef7457e4 100644 --- a/sdk/eventhubs/microsoft-azure-eventhubs/README.md +++ b/sdk/eventhubs/microsoft-azure-eventhubs/README.md @@ -1,7 +1,7 @@ # Azure Event Hubs (Track 1) client library for Java -

Microsoft Azure Event Hubs Client for Java +

Microsoft Azure Event Hubs Client for Java

Azure Event Hubs is a hyper-scale data ingestion service, fully-managed by Microsoft, that enables you to collect, store and process trillions of events from websites, apps, IoT devices, and any stream of data. @@ -15,7 +15,7 @@ general and for an overview of Event Hubs Client for Java. - An **Event Hub producer** is a source of telemetry data, diagnostics information, usage logs, or other log data, as part of an embedded device solution, a mobile device application, a game title running on a console or other device, - some client or server based business solution, or a web site. + some client or server based business solution, or a website. - An **Event Hub consumer** picks up such information from the Event Hub and processes it. Processing may involve aggregation, complex computation, and filtering. Processing may also involve distribution or storage of the @@ -29,13 +29,15 @@ general and for an overview of Event Hubs Client for Java. - A **consumer group** is a view of an entire Event Hub. Consumer groups enable multiple consuming applications to each have a separate view of the event stream, and to read the stream independently at their own pace and from their own - position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended that - there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all of + position. There can be at most 5 concurrent readers on a partition per consumer group; however it is recommended + there is only one active consumer for a given partition and consumer group pairing. Each active reader receives all the events from its partition; if there are multiple readers on the same partition, then they will receive duplicate events. -For more concepts and deeper discussion, see: [Event Hubs Features][event_hubs_features]. Also, the concepts for AMQP -are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version 1.0][oasis_amqp_v1]. +For more concepts and deeper discussion, see: +[Event Hubs Features](https://docs.microsoft.com/azure/event-hubs/event-hubs-features). Also, the concepts for AMQP +are well documented in [OASIS Advanced Messaging Queuing Protocol (AMQP) Version +1.0](http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-overview-v1.0-os.html). ### Referencing the library @@ -50,12 +52,12 @@ the required versions of Apache Qpid Proton-J, and the cryptography library BCPK |--------|------------------| |azure-eventhubs|[![Maven Central](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.microsoft.azure/azure-eventhubs) -```XML - - com.microsoft.azure - azure-eventhubs - 2.3.1 - +```xml + + com.microsoft.azure + azure-eventhubs + 2.3.1 + ``` #### Microsoft Azure EventHubs Java Event Processor Host library @@ -80,14 +82,14 @@ It pulls the required versions of Event Hubs, Azure Storage and GSon libraries. First, if you experience any issues with the runtime behavior of the Azure Event Hubs service, please consider filing a support request right away. Your options for [getting support are enumerated here](https://azure.microsoft.com/support/options/). In the Azure portal, you can file a support request from the "Help -and support" menu in the upper right hand corner of the page. +and support" menu in the upper right-hand corner of the page. If you find issues in this library or have suggestions for improvement of code or documentation, you can [file an issue in the project's GitHub repository](https://github.com/Azure/azure-sdk-for-java/issues) or send across a pull request - see our [Contribution Guidelines](../azure-messaging-eventhubs/CONTRIBUTING.md). Issues related to runtime behavior of the service, such as sporadic exceptions or apparent service-side performance or -reliability issues can not be handled here. +reliability issues cannot be handled here. Generally, if you want to discuss Azure Event Hubs or this client library with the community and the maintainers, you can turn to [stackoverflow.com under the #azure-eventhub tag](http://stackoverflow.com/questions/tagged/azure-eventhub) diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java index bc4d3848da80..6a5d79386fdb 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorReceiverTest.java @@ -102,7 +102,6 @@ void setup(TestInfo testInfo) throws IOException { when(receiveLinkHandler.getDeliveredMessages()).thenReturn(deliveryProcessor); when(receiveLinkHandler.getLinkName()).thenReturn(LINK_NAME); when(receiveLinkHandler.getEndpointStates()).thenReturn(endpointStates); - when(receiveLinkHandler.getErrors()).thenReturn(Flux.never()); when(tokenManager.getAuthorizationResults()).thenReturn(Flux.create(sink -> sink.next(AmqpResponseCode.OK))); diff --git a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java index 2f4b1ea09b5c..60988b94fc83 100644 --- a/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java +++ b/sdk/servicebus/azure-messaging-servicebus/src/test/java/com/azure/messaging/servicebus/implementation/ServiceBusReactorSessionTest.java @@ -146,7 +146,6 @@ void setup(TestInfo testInfo) throws IOException { sink1.next(EndpointState.ACTIVE); when(handler.getHostname()).thenReturn(HOSTNAME); when(handler.getConnectionId()).thenReturn(CONNECTION_ID); - when(handler.getErrors()).thenReturn(Flux.empty()); when(handlerProvider.createSendLinkHandler(CONNECTION_ID, HOSTNAME, viaEntityPathSenderLinkName, viaEntityPath)) .thenReturn(sendViaEntityLinkHandler); @@ -165,9 +164,6 @@ void setup(TestInfo testInfo) throws IOException { when(sendViaEntityLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); when(sendEntityLinkHandler.getEndpointStates()).thenReturn(endpointStateReplayProcessor); - when(sendViaEntityLinkHandler.getErrors()).thenReturn(Flux.empty()); - when(sendEntityLinkHandler.getErrors()).thenReturn(Flux.empty()); - when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, viaEntityPath)).thenReturn(tokenManagerViaQueue); when(tokenManagerProvider.getTokenManager(cbsNodeSupplier, entityPath)).thenReturn(tokenManagerEntity);