Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Release History

## 1.5.0-beta.1 (Unreleased)
- Added Amqp Message envelope which can be accessed using `AmqpAnnotatedMessage`.
- Remove unused and duplicate logic for Handlers.getErrors().
- Close children sessions and links when a connection is disposed.
- Added AMQP Message envelope which can be accessed using `AmqpAnnotatedMessage`.

## 1.4.0 (2020-08-11)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ public class AmqpRetryOptions {
* Creates an instance with the default retry options set.
*/
public AmqpRetryOptions() {
maxRetries = 3;
delay = Duration.ofMillis(800);
maxDelay = Duration.ofMinutes(1);
tryTimeout = Duration.ofMinutes(1);
retryMode = AmqpRetryMode.EXPONENTIAL;
this.maxRetries = 3;
this.delay = Duration.ofMillis(800);
this.maxDelay = Duration.ofMinutes(1);
this.tryTimeout = Duration.ofMinutes(1);
this.retryMode = AmqpRetryMode.EXPONENTIAL;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@
import com.azure.core.amqp.implementation.handler.ConnectionHandler;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.BaseHandler;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.reactor.Reactor;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
Expand All @@ -35,6 +36,8 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.azure.core.amqp.implementation.ClientConstants.NOT_APPLICABLE;

public class ReactorConnection implements AmqpConnection {
private static final String CBS_SESSION_NAME = "cbs-session";
private static final String CBS_ADDRESS = "$cbs";
Expand All @@ -43,12 +46,10 @@ public class ReactorConnection implements AmqpConnection {
private final ClientLogger logger = new ClientLogger(ReactorConnection.class);
private final ConcurrentMap<String, SessionSubscription> sessionMap = new ConcurrentHashMap<>();

private final AtomicBoolean hasConnection = new AtomicBoolean();
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final DirectProcessor<AmqpShutdownSignal> shutdownSignals = DirectProcessor.create();
private final ReplayProcessor<AmqpEndpointState> endpointStates =
ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED);
private FluxSink<AmqpEndpointState> endpointStatesSink = endpointStates.sink(FluxSink.OverflowStrategy.BUFFER);
private final FluxSink<AmqpShutdownSignal> shutdownSignalsSink = shutdownSignals.sink();
private final ReplayProcessor<AmqpEndpointState> endpointStates;

private final String connectionId;
private final Mono<Connection> connectionMono;
Expand All @@ -58,7 +59,6 @@ public class ReactorConnection implements AmqpConnection {
private final MessageSerializer messageSerializer;
private final ConnectionOptions connectionOptions;
private final ReactorProvider reactorProvider;
private final Disposable.Composite subscriptions;
private final AmqpRetryPolicy retryPolicy;
private final SenderSettleMode senderSettleMode;
private final ReceiverSettleMode receiverSettleMode;
Expand Down Expand Up @@ -86,8 +86,8 @@ public class ReactorConnection implements AmqpConnection {
*/
public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider,
ReactorHandlerProvider handlerProvider, TokenManagerProvider tokenManagerProvider,
MessageSerializer messageSerializer, String product, String clientVersion,
SenderSettleMode senderSettleMode, ReceiverSettleMode receiverSettleMode) {
MessageSerializer messageSerializer, String product, String clientVersion, SenderSettleMode senderSettleMode,
ReceiverSettleMode receiverSettleMode) {

this.connectionOptions = connectionOptions;
this.reactorProvider = reactorProvider;
Expand All @@ -103,26 +103,14 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption
this.senderSettleMode = senderSettleMode;
this.receiverSettleMode = receiverSettleMode;

this.connectionMono = Mono.fromCallable(this::getOrCreateConnection)
.doOnSubscribe(c -> hasConnection.set(true));

this.subscriptions = Disposables.composite(
this.handler.getEndpointStates().subscribe(
state -> {
logger.verbose("connectionId[{}]: Connection state: {}", connectionId, state);
endpointStatesSink.next(AmqpEndpointStateUtil.getConnectionState(state));
}, error -> {
logger.error("connectionId[{}] Error occurred in connection endpoint.", connectionId, error);
endpointStatesSink.error(error);
}, () -> {
endpointStatesSink.next(AmqpEndpointState.CLOSED);
endpointStatesSink.complete();
}),

this.handler.getErrors().subscribe(error -> {
logger.error("connectionId[{}] Error occurred in connection handler.", connectionId, error);
endpointStatesSink.error(error);
}));
this.connectionMono = Mono.fromCallable(this::getOrCreateConnection);

this.endpointStates = this.handler.getEndpointStates()
.takeUntilOther(shutdownSignals)
.map(state -> {
logger.verbose("connectionId[{}]: State {}", connectionId, state);
return AmqpEndpointStateUtil.getConnectionState(state);
}).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
}

/**
Expand All @@ -148,14 +136,12 @@ public Mono<ClaimsBasedSecurityNode> getClaimsBasedSecurityNode() {
"connectionId[%s]: Connection is disposed. Cannot get CBS node.", connectionId))));
}

final Mono<ClaimsBasedSecurityNode> cbsNodeMono = RetryUtil.withRetry(
getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE),
connectionOptions.getRetry().getTryTimeout(), retryPolicy)
final Mono<ClaimsBasedSecurityNode> cbsNodeMono =
RetryUtil.withRetry(getEndpointStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE),
connectionOptions.getRetry().getTryTimeout(), retryPolicy)
.then(Mono.fromCallable(this::getOrCreateCBSNode));

return hasConnection.get()
? cbsNodeMono
: connectionMono.then(cbsNodeMono);
return connectionMono.then(cbsNodeMono);
}

@Override
Expand Down Expand Up @@ -249,17 +235,7 @@ protected AmqpSession createSession(String sessionName, Session session, Session
*/
@Override
public boolean removeSession(String sessionName) {
if (sessionName == null) {
return false;
}

final SessionSubscription removed = sessionMap.remove(sessionName);

if (removed != null) {
removed.dispose();
}

return removed != null;
return removeSession(sessionName, null);
}

@Override
Expand All @@ -272,18 +248,23 @@ public boolean isDisposed() {
*/
@Override
public void dispose() {
dispose(null);
shutdownSignalsSink.next(new AmqpShutdownSignal(false, true,
"Disposed by client."));
}

void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}

logger.info("connectionId[{}]: Disposing of ReactorConnection.", connectionId);
subscriptions.dispose();
endpointStatesSink.complete();
logger.info("connectionId[{}], errorCondition[{}]: Disposing of ReactorConnection.", connectionId,
errorCondition != null ? errorCondition : NOT_APPLICABLE);

final String[] keys = sessionMap.keySet().toArray(new String[0]);
for (String key : keys) {
logger.info("connectionId[{}]: Removing session '{}'", connectionId, key);
removeSession(key);
removeSession(key, errorCondition);
}

if (connection != null) {
Expand Down Expand Up @@ -331,6 +312,20 @@ protected Mono<RequestResponseChannel> createRequestResponseChannel(String sessi
new ClientLogger(String.format("%s<%s>", RequestResponseChannel.class, sessionName))));
}

private boolean removeSession(String sessionName, ErrorCondition errorCondition) {
if (sessionName == null) {
return false;
}

final SessionSubscription removed = sessionMap.remove(sessionName);

if (removed != null) {
removed.dispose(errorCondition);
}

return removed != null;
}

private synchronized ClaimsBasedSecurityNode getOrCreateCBSNode() {
if (cbsChannel == null) {
logger.info("Setting CBS channel.");
Expand Down Expand Up @@ -380,6 +375,7 @@ public void onConnectionError(Throwable exception) {
getId(), getFullyQualifiedNamespace(), exception.getMessage());

endpointStates.onError(exception);
ReactorConnection.this.dispose();
}

@Override
Expand All @@ -393,16 +389,12 @@ void onConnectionShutdown(AmqpShutdownSignal shutdownSignal) {
"onReactorError connectionId[{}], hostName[{}], message[Shutting down], shutdown signal[{}]",
getId(), getFullyQualifiedNamespace(), shutdownSignal.isInitiatedByClient(), shutdownSignal);

if (!endpointStatesSink.isCancelled()) {
endpointStatesSink.next(AmqpEndpointState.CLOSED);
endpointStatesSink.complete();
}

dispose();
dispose(new ErrorCondition(Symbol.getSymbol("onReactorError"), shutdownSignal.toString()));
shutdownSignalsSink.next(shutdownSignal);
}
}

private static final class SessionSubscription implements Disposable {
private static final class SessionSubscription {
private final AtomicBoolean isDisposed = new AtomicBoolean();
private final AmqpSession session;
private final Disposable subscription;
Expand All @@ -412,22 +404,23 @@ private SessionSubscription(AmqpSession session, Disposable subscription) {
this.subscription = subscription;
}

public Disposable getSubscription() {
return subscription;
}

public AmqpSession getSession() {
return session;
}

@Override
public void dispose() {
void dispose(ErrorCondition errorCondition) {
if (isDisposed.getAndSet(true)) {
return;
}

if (session instanceof ReactorSession) {
final ReactorSession reactorSession = (ReactorSession) session;
reactorSession.dispose(errorCondition);
} else {
session.dispose();
}

subscription.dispose();
session.dispose();
}
}
}
Loading