Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion eng/jacoco-test-coverage/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
2 changes: 1 addition & 1 deletion eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ com.azure:azure-communication-identity;1.1.0;1.2.0-beta.1
com.azure:azure-communication-phonenumbers;1.0.1;1.1.0-beta.1
com.azure:azure-containers-containerregistry;1.0.0-beta.2;1.0.0-beta.3
com.azure:azure-core;1.16.0;1.17.0
com.azure:azure-core-amqp;2.0.6;2.1.0-beta.1
com.azure:azure-core-amqp;2.0.6;2.2.0
com.azure:azure-core-amqp-experimental;1.0.0-beta.1;1.0.0-beta.1
com.azure:azure-core-experimental;1.0.0-beta.13;1.0.0-beta.14
com.azure:azure-core-http-jdk-httpclient;1.0.0-beta.1;1.0.0-beta.1
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp-experimental/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>

<dependency>
Expand Down
19 changes: 4 additions & 15 deletions sdk/core/azure-core-amqp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
# Release History

## 2.1.0-beta.1 (Unreleased)

## 2.2.0 (2021-06-10)
### New Features
- Exposing CbsAuthorizationType.
- Exposing ManagementNode that can perform management and metadata operations on an AMQP message broker.
- AmqpConnection, AmqpSession, AmqpSendLink, and AmqpReceiveLink extend from AsyncCloseable.
- Delivery outcomes and delivery states are added.
- Adding support for AMQP data types SEQUENCE and VALUE.

### Bug Fixes
- Fixed a bug where connection and sessions would not be disposed when their endpoint closed.
### Dependency Updates
- Upgraded `azure-core` dependency to `1.15.0`.

## 2.0.6 (2021-05-24)
### Bug Fixes
Expand All @@ -22,13 +18,6 @@
- Upgraded `azure-core` from `1.15.0` to `1.16.0`.
- Upgraded Reactor from `3.4.3` to `3.4.5`.

## 2.2.0-beta.1 (2021-04-14)
### New Features
- Adding support for AMQP data types SEQUENCE and VALUE.

### Dependency Updates
- Upgraded `azure-core` dependency to `1.15.0`.

## 2.0.4 (2021-04-12)

### Bug Fixes
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/azure-core-amqp/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<packaging>jar</packaging>

<name>Microsoft Azure Java Core AMQP Library</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,13 @@ private Mono<Void> isAuthorized() {
.next()
.switchIfEmpty(Mono.error(new AmqpException(false, "Did not get response from tokenManager: " + entityPath, getErrorContext())))
.handle((response, sink) -> {
if (response != AmqpResponseCode.ACCEPTED && response != AmqpResponseCode.OK) {
if (RequestResponseUtils.isSuccessful(response)) {
sink.complete();
} else {
final String message = String.format("User does not have authorization to perform operation "
+ "on entity [%s]. Response: [%s]", entityPath, response);
sink.error(ExceptionUtil.amqpResponseCodeToException(response.getValue(), message,
getErrorContext()));

} else {
sink.complete();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,17 +464,12 @@ private synchronized void closeConnectionWork() {
}

connection.close();
handler.close();

final ArrayList<Mono<Void>> closingSessions = new ArrayList<>();
sessionMap.values().forEach(link -> closingSessions.add(link.isClosed()));

final Mono<Void> closedExecutor;
if (executor != null) {
closedExecutor = executor.isClosed();
executor.close();
} else {
closedExecutor = Mono.empty();
}
final Mono<Void> closedExecutor = executor != null ? executor.closeAsync() : Mono.empty();

// Close all the children.
final Mono<Void> closeSessionsMono = Mono.when(closingSessions)
Expand All @@ -491,7 +486,6 @@ private synchronized void closeConnectionWork() {
return false;
});

handler.close();
subscriptions.dispose();
}));

Expand Down Expand Up @@ -521,10 +515,6 @@ private synchronized Connection getOrCreateConnection() throws IOException {

final ReactorExceptionHandler reactorExceptionHandler = new ReactorExceptionHandler();

reactorProvider.getReactorDispatcher().getShutdownSignal()
.subscribe(signal -> reactorExceptionHandler.onConnectionShutdown(signal),
error -> reactorExceptionHandler.onConnectionError(error));

// Use a new single-threaded scheduler for this connection as QPID's Reactor is not thread-safe.
// Using Schedulers.single() will use the same thread for all connections in this process which
// limits the scalability of the no. of concurrent connections a single process can have.
Expand All @@ -539,6 +529,22 @@ private synchronized Connection getOrCreateConnection() throws IOException {
reactorExceptionHandler, pendingTasksDuration,
connectionOptions.getFullyQualifiedNamespace());

// To avoid inconsistent synchronization of executor, we set this field with the closeAsync method.
// It will not be kicked off until subscribed to.
final Mono<Void> executorCloseMono = executor.closeAsync();
reactorProvider.getReactorDispatcher().getShutdownSignal()
.flatMap(signal -> {
logger.info("Shutdown signal received from reactor provider.");
reactorExceptionHandler.onConnectionShutdown(signal);
return executorCloseMono;
})
.onErrorResume(error -> {
logger.info("Error received from reactor provider.", error);
reactorExceptionHandler.onConnectionError(error);
return executorCloseMono;
})
.subscribe();

executor.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.AmqpShutdownSignal;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.AsyncCloseable;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.engine.HandlerException;
Expand All @@ -14,7 +15,6 @@
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Scheduler;

import java.io.Closeable;
import java.nio.channels.UnresolvedAddressException;
import java.time.Duration;
import java.util.Locale;
Expand All @@ -23,7 +23,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

class ReactorExecutor implements Closeable {
/**
* Schedules the proton-j reactor to continuously run work.
*/
class ReactorExecutor implements AsyncCloseable {
private static final String LOG_MESSAGE = "connectionId[{}], message[{}]";

private final ClientLogger logger = new ClientLogger(ReactorExecutor.class);
Expand Down Expand Up @@ -51,7 +54,7 @@ class ReactorExecutor implements Closeable {

/**
* Starts the reactor and will begin processing any reactor events until there are no longer any left or {@link
* #close()} is called.
* #closeAsync()} is called.
*/
void start() {
if (hasStarted.getAndSet(true)) {
Expand Down Expand Up @@ -142,10 +145,6 @@ private void run() {
}
}

Mono<Void> isClosed() {
return isClosedMono.asMono();
}

/**
* Schedules the release of the current reactor after operation timeout has elapsed.
*/
Expand Down Expand Up @@ -175,26 +174,27 @@ private void scheduleCompletePendingTasks() {
}, timeout.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
public void close() {
if (isDisposed.getAndSet(true)) {
return;
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}
}

private void close(String reason) {
logger.verbose("Completing close and disposing scheduler. {}", reason);

scheduler.dispose();
isClosedMono.emitEmpty((signalType, emitResult) -> {
logger.verbose("signalType[{}] emitResult[{}]: Unable to emit close event on reactor", signalType,
emitResult);
return false;
});
exceptionHandler.onConnectionShutdown(new AmqpShutdownSignal(false, false, reason));
scheduler.dispose();
}

@Override
public Mono<Void> closeAsync() {
if (isDisposed.getAndSet(true)) {
return isClosedMono.asMono();
}

if (hasStarted.get()) {
scheduleCompletePendingTasks();
}

return isClosedMono.asMono();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public class RequestResponseUtils {

public static boolean isSuccessful(Message message) {
final AmqpResponseCode statusCode = getStatusCode(message);
return isSuccessful(statusCode);
}

public static boolean isSuccessful(AmqpResponseCode statusCode) {
return statusCode == AmqpResponseCode.OK || statusCode == AmqpResponseCode.ACCEPTED;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@

package com.azure.core.amqp.models;

import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;

/**
* Class contains sample code snippets that will be used in javadocs.
*/
public class AmqpAnnotatedMessageJavaDocCodeSamples {
/**
* Get message body from {@link AmqpAnnotatedMessage}.
*/
@Test
public void checkBodyType() {
AmqpAnnotatedMessage amqpAnnotatedMessage = null;
AmqpAnnotatedMessage amqpAnnotatedMessage =
new AmqpAnnotatedMessage(AmqpMessageBody.fromData("my-amqp-message".getBytes(StandardCharsets.UTF_8)));
// BEGIN: com.azure.core.amqp.models.AmqpBodyType.checkBodyType
// If client do not check `AmqpMessageBody.getBodyType()` and payload is not of type `AmqpMessageBodyType.DATA`,
// calling `getFirstData()` or `getData()` on `AmqpMessageBody` will throw Runtime exception.
Expand Down
2 changes: 1 addition & 1 deletion sdk/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.1.0-beta.1</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
<version>2.2.0</version> <!-- {x-version-update;com.azure:azure-core-amqp;current} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ private <T> Mono<T> getProperties(Map<String, Object> properties, Class<T> respo
.handle((message, sink) -> {
if (RequestResponseUtils.isSuccessful(message)) {
sink.next(messageSerializer.deserialize(message, responseType));
}

final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
final String statusDescription = RequestResponseUtils.getStatusDescription(message);
final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(),
statusDescription, channel.getErrorContext());
} else {
final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
final String statusDescription = RequestResponseUtils.getStatusDescription(message);
final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(),
statusDescription, channel.getErrorContext());

sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error)));
sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error)));
}
}));
});
}
Expand Down