diff --git a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
index b39b25ffc59c..9f2ab7237ea1 100755
--- a/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
+++ b/eng/code-quality-reports/src/main/resources/checkstyle/checkstyle-suppressions.xml
@@ -473,6 +473,7 @@
+
+
+
+
+
+
+
@@ -603,11 +610,11 @@
-
+
-
+
diff --git a/eng/code-quality-reports/src/main/resources/revapi/revapi.json b/eng/code-quality-reports/src/main/resources/revapi/revapi.json
index bc69bcd19c16..eb8afd82f704 100644
--- a/eng/code-quality-reports/src/main/resources/revapi/revapi.json
+++ b/eng/code-quality-reports/src/main/resources/revapi/revapi.json
@@ -235,6 +235,12 @@
"code": "java\\.class\\.externalClassExposedInAPI",
"new": "(interface|class|enum) io\\.opentelemetry.*",
"justification": "Azure Monitor Exporter is allowed to use OpenTelemetry types in public APIs as it implements interfaces defined by OpenTelemetry"
+ },
+ {
+ "code": "java.annotation.attributeAdded",
+ "old": "class com.azure.messaging.eventhubs.EventHubClientBuilder",
+ "new": "class com.azure.messaging.eventhubs.EventHubClientBuilder",
+ "justification": "Setting protocol to AMQP in @ServiceClientBuilder annotation is not a breaking change"
}
]
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
index a21eb74f8f0b..0ffe23f98ad1 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
@@ -18,6 +18,7 @@
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.annotation.ServiceClientBuilder;
+import com.azure.core.annotation.ServiceClientProtocol;
import com.azure.core.credential.TokenCredential;
import com.azure.core.exception.AzureException;
import com.azure.core.util.ClientOptions;
@@ -94,7 +95,7 @@
* @see EventHubConsumerClient
*/
@ServiceClientBuilder(serviceClients = {EventHubProducerAsyncClient.class, EventHubProducerClient.class,
- EventHubConsumerAsyncClient.class, EventHubConsumerClient.class})
+ EventHubConsumerAsyncClient.class, EventHubConsumerClient.class}, protocol = ServiceClientProtocol.AMQP)
public class EventHubClientBuilder {
// Default number of events to fetch when creating the consumer.
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java
index 50c3d59d86f2..ee88f75aa6b0 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java
@@ -140,6 +140,7 @@ public Mono getEventHubProperties() {
*
* @return A Flux of identifiers for the partitions of an Event Hub.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public Flux getPartitionIds() {
return getEventHubProperties().flatMapMany(properties -> Flux.fromIterable(properties.getPartitionIds()));
}
@@ -177,6 +178,7 @@ public Mono getPartitionProperties(String partitionId) {
* @throws NullPointerException if {@code partitionId}, or {@code startingPosition} is null.
* @throws IllegalArgumentException if {@code partitionId} is an empty string.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public Flux receiveFromPartition(String partitionId, EventPosition startingPosition) {
return receiveFromPartition(partitionId, startingPosition, defaultReceiveOptions);
}
@@ -205,6 +207,7 @@ public Flux receiveFromPartition(String partitionId, EventPositi
* null.
* @throws IllegalArgumentException if {@code partitionId} is an empty string.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public Flux receiveFromPartition(String partitionId, EventPosition startingPosition,
ReceiveOptions receiveOptions) {
if (Objects.isNull(partitionId)) {
@@ -235,6 +238,7 @@ public Flux receiveFromPartition(String partitionId, EventPositi
*
* @return A stream of events for every partition in the Event Hub starting from the beginning of each partition.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public Flux receive() {
return receive(true, defaultReceiveOptions);
}
@@ -256,6 +260,7 @@ public Flux receive() {
*
* @return A stream of events for every partition in the Event Hub.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public Flux receive(boolean startReadingAtEarliestEvent) {
return receive(startReadingAtEarliestEvent, defaultReceiveOptions);
}
@@ -289,6 +294,7 @@ public Flux receive(boolean startReadingAtEarliestEvent) {
*
* @throws NullPointerException if {@code receiveOptions} is null.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public Flux receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions) {
if (Objects.isNull(receiveOptions)) {
return fluxError(logger, new NullPointerException("'receiveOptions' cannot be null."));
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java
index bc985d7c54be..7abf3cd381d5 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerClient.java
@@ -94,6 +94,7 @@ public EventHubProperties getEventHubProperties() {
*
* @return The set of identifiers for the partitions of an Event Hub.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream getPartitionIds() {
return new IterableStream<>(consumer.getPartitionIds());
}
@@ -128,6 +129,7 @@ public PartitionProperties getPartitionProperties(String partitionId) {
* @throws IllegalArgumentException if {@code maximumMessageCount} is less than 1, or if {@code partitionId} is an
* empty string.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream receiveFromPartition(String partitionId, int maximumMessageCount,
EventPosition startingPosition) {
return receiveFromPartition(partitionId, maximumMessageCount, startingPosition, timeout);
@@ -150,6 +152,7 @@ public IterableStream receiveFromPartition(String partitionId, i
* @throws IllegalArgumentException if {@code maximumMessageCount} is less than 1 or {@code maximumWaitTime} is
* zero or a negative duration.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream receiveFromPartition(String partitionId, int maximumMessageCount,
EventPosition startingPosition, Duration maximumWaitTime) {
if (Objects.isNull(maximumWaitTime)) {
@@ -197,6 +200,7 @@ public IterableStream receiveFromPartition(String partitionId, i
* @throws IllegalArgumentException if {@code maximumMessageCount} is less than 1 or {@code maximumWaitTime} is
* zero or a negative duration.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream receiveFromPartition(String partitionId, int maximumMessageCount,
EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions) {
if (Objects.isNull(maximumWaitTime)) {
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java
index 2a502d8afc86..760b7d6b135d 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java
@@ -171,6 +171,7 @@ public Mono getEventHubProperties() {
*
* @return A Flux of identifiers for the partitions of an Event Hub.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public Flux getPartitionIds() {
return getEventHubProperties().flatMapMany(properties -> Flux.fromIterable(properties.getPartitionIds()));
}
@@ -194,6 +195,7 @@ public Mono getPartitionProperties(String partitionId) {
*
* @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public Mono createBatch() {
return createBatch(DEFAULT_BATCH_OPTIONS);
}
@@ -205,6 +207,7 @@ public Mono createBatch() {
* @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
* @throws NullPointerException if {@code options} is null.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public Mono createBatch(CreateBatchOptions options) {
if (options == null) {
return monoError(logger, new NullPointerException("'options' cannot be null."));
@@ -312,6 +315,7 @@ Mono send(EventData event, SendOptions options) {
* @return A {@link Mono} that completes when all events are pushed to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public Mono send(Iterable events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
@@ -338,6 +342,7 @@ public Mono send(Iterable events) {
* @return A {@link Mono} that completes when all events are pushed to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public Mono send(Iterable events, SendOptions options) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
@@ -392,6 +397,7 @@ Mono send(Flux events, SendOptions options) {
* @see EventHubProducerAsyncClient#createBatch()
* @see EventHubProducerAsyncClient#createBatch(CreateBatchOptions)
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public Mono send(EventDataBatch batch) {
if (batch == null) {
return monoError(logger, new NullPointerException("'batch' cannot be null."));
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java
index 700fd8d9c648..f805d67b2c25 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerClient.java
@@ -100,6 +100,7 @@ public EventHubProperties getEventHubProperties() {
*
* @return A Flux of identifiers for the partitions of an Event Hub.
*/
+ @ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream getPartitionIds() {
return new IterableStream<>(producer.getPartitionIds());
}
@@ -122,6 +123,7 @@ public PartitionProperties getPartitionProperties(String partitionId) {
*
* @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch() {
return producer.createBatch().block(tryTimeout);
}
@@ -135,6 +137,7 @@ public EventDataBatch createBatch() {
*
* @throws NullPointerException if {@code options} is null.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch(CreateBatchOptions options) {
return producer.createBatch(options).block(tryTimeout);
}
@@ -151,6 +154,7 @@ public EventDataBatch createBatch(CreateBatchOptions options) {
*
* @param event Event to send to the service.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
void send(EventData event) {
producer.send(event).block();
}
@@ -168,6 +172,7 @@ void send(EventData event) {
* @param event Event to send to the service.
* @param options The set of options to consider when sending this event.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
void send(EventData event, SendOptions options) {
producer.send(event, options).block();
}
@@ -188,6 +193,7 @@ void send(EventData event, SendOptions options) {
* @param events Events to send to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public void send(Iterable events) {
producer.send(events).block();
}
@@ -209,6 +215,7 @@ public void send(Iterable events) {
* @param options The set of options to consider when sending this batch.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public void send(Iterable events, SendOptions options) {
producer.send(events, options).block();
}
@@ -221,6 +228,7 @@ public void send(Iterable events, SendOptions options) {
* @see EventHubProducerClient#createBatch()
* @see EventHubProducerClient#createBatch(CreateBatchOptions)
*/
+ @ServiceMethod(returns = ReturnType.SINGLE)
public void send(EventDataBatch batch) {
producer.send(batch).block();
}