Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files=".*[/\\]textanalytics[/\\].*"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files=".*[/\\]schemaregistry[/\\].*"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files=".*[/\\]monitor[/\\].*"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files=".*[/\\]eventhubs[/\\].*"/>

<!-- Don't enforce non-static ClientLogger instances in azure-core-mgmt PollerFactory and PollingState types-->
<suppress checks="com\.azure\.tools\.checkstyle\.checks\.(ThrowFromClientLoggerCheck|GoodLoggingCheck)"
Expand Down Expand Up @@ -565,6 +566,12 @@
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.communication.administration.PhoneNumberClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.communication.administration.PhoneNumberAsyncClient.java"/>

<!-- Checkstyle suppression for Event Hubs client APIs that use Flux instead of PagedFlux for methods that return a collection -->
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.messaging.eventhubs.EventHubConsumerClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.messaging.eventhubs.EventHubConsumerAsyncClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.messaging.eventhubs.EventHubProducerClient.java"/>
<suppress checks="com.azure.tools.checkstyle.checks.ServiceClientCheck" files="com.azure.messaging.eventhubs.EventHubProducerAsyncClient.java"/>

<!-- ### begin: Spring related suppression -->
<!-- GoodLoggingCheck: Spring as a framework, will not directly use ClientLogger-->
<suppress checks="com.azure.tools.checkstyle.checks.(GoodLoggingCheck|ThrowFromClientLoggerCheck)" files=".*[/\\]com[/\\]azure[/\\]spring[/\\].*"></suppress>
Expand Down Expand Up @@ -603,11 +610,11 @@
<suppress checks="com.azure.tools.checkstyle.checks.EnforceFinalFieldsCheck" files="com.azure.spring.keyvault.KeyVaultOperation.java"/>

<suppress checks="MethodName" files="com.azure.spring.data.gremlin.common.GremlinConfig.java"/>

<suppress checks="MethodName" files="com.azure.spring.integration.core.api.CheckpointConfig.java"/>
<suppress checks="MethodName" files="com.azure.spring.integration.servicebus.ServiceBusClientConfig.java"/>

<!-- Checkstyle suppressions for azure.spring.data.cosmos package -->
<!-- Checkstyle suppressions for azure.spring.data.cosmos package -->
<suppress checks="MethodName|MemberName|ParameterName" files="src[/\\]test[/\\]java[/\\]com[/\\]azure[/\\]spring[/\\]data[/\\]cosmos[/\\]domain|repository.*.java"/>
<suppress checks="MethodName" files="com.azure.spring.data.cosmos.config.CosmosConfig.java"/>
<suppress checks="PackageName" files="com.azure.spring.data.cosmos.multidatasource.*\.java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,12 @@
"code": "java\\.class\\.externalClassExposedInAPI",
"new": "(interface|class|enum) io\\.opentelemetry.*",
"justification": "Azure Monitor Exporter is allowed to use OpenTelemetry types in public APIs as it implements interfaces defined by OpenTelemetry"
},
{
"code": "java.annotation.attributeAdded",
"old": "class com.azure.messaging.eventhubs.EventHubClientBuilder",
"new": "class com.azure.messaging.eventhubs.EventHubClientBuilder",
"justification": "Setting protocol to AMQP in @ServiceClientBuilder annotation is not a breaking change"
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.annotation.ServiceClientBuilder;
import com.azure.core.annotation.ServiceClientProtocol;
import com.azure.core.credential.TokenCredential;
import com.azure.core.exception.AzureException;
import com.azure.core.util.ClientOptions;
Expand Down Expand Up @@ -94,7 +95,7 @@
* @see EventHubConsumerClient
*/
@ServiceClientBuilder(serviceClients = {EventHubProducerAsyncClient.class, EventHubProducerClient.class,
EventHubConsumerAsyncClient.class, EventHubConsumerClient.class})
EventHubConsumerAsyncClient.class, EventHubConsumerClient.class}, protocol = ServiceClientProtocol.AMQP)
public class EventHubClientBuilder {

// Default number of events to fetch when creating the consumer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public Mono<EventHubProperties> getEventHubProperties() {
*
* @return A Flux of identifiers for the partitions of an Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<String> getPartitionIds() {
return getEventHubProperties().flatMapMany(properties -> Flux.fromIterable(properties.getPartitionIds()));
}
Expand Down Expand Up @@ -177,6 +178,7 @@ public Mono<PartitionProperties> getPartitionProperties(String partitionId) {
* @throws NullPointerException if {@code partitionId}, or {@code startingPosition} is null.
* @throws IllegalArgumentException if {@code partitionId} is an empty string.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition) {
return receiveFromPartition(partitionId, startingPosition, defaultReceiveOptions);
}
Expand Down Expand Up @@ -205,6 +207,7 @@ public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPositi
* null.
* @throws IllegalArgumentException if {@code partitionId} is an empty string.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPosition startingPosition,
ReceiveOptions receiveOptions) {
if (Objects.isNull(partitionId)) {
Expand Down Expand Up @@ -235,6 +238,7 @@ public Flux<PartitionEvent> receiveFromPartition(String partitionId, EventPositi
*
* @return A stream of events for every partition in the Event Hub starting from the beginning of each partition.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<PartitionEvent> receive() {
return receive(true, defaultReceiveOptions);
}
Expand All @@ -256,6 +260,7 @@ public Flux<PartitionEvent> receive() {
*
* @return A stream of events for every partition in the Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent) {
return receive(startReadingAtEarliestEvent, defaultReceiveOptions);
}
Expand Down Expand Up @@ -289,6 +294,7 @@ public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent) {
*
* @throws NullPointerException if {@code receiveOptions} is null.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<PartitionEvent> receive(boolean startReadingAtEarliestEvent, ReceiveOptions receiveOptions) {
if (Objects.isNull(receiveOptions)) {
return fluxError(logger, new NullPointerException("'receiveOptions' cannot be null."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public EventHubProperties getEventHubProperties() {
*
* @return The set of identifiers for the partitions of an Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream<String> getPartitionIds() {
return new IterableStream<>(consumer.getPartitionIds());
}
Expand Down Expand Up @@ -128,6 +129,7 @@ public PartitionProperties getPartitionProperties(String partitionId) {
* @throws IllegalArgumentException if {@code maximumMessageCount} is less than 1, or if {@code partitionId} is an
* empty string.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount,
EventPosition startingPosition) {
return receiveFromPartition(partitionId, maximumMessageCount, startingPosition, timeout);
Expand All @@ -150,6 +152,7 @@ public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, i
* @throws IllegalArgumentException if {@code maximumMessageCount} is less than 1 or {@code maximumWaitTime} is
* zero or a negative duration.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount,
EventPosition startingPosition, Duration maximumWaitTime) {
if (Objects.isNull(maximumWaitTime)) {
Expand Down Expand Up @@ -197,6 +200,7 @@ public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, i
* @throws IllegalArgumentException if {@code maximumMessageCount} is less than 1 or {@code maximumWaitTime} is
* zero or a negative duration.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream<PartitionEvent> receiveFromPartition(String partitionId, int maximumMessageCount,
EventPosition startingPosition, Duration maximumWaitTime, ReceiveOptions receiveOptions) {
if (Objects.isNull(maximumWaitTime)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public Mono<EventHubProperties> getEventHubProperties() {
*
* @return A Flux of identifiers for the partitions of an Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public Flux<String> getPartitionIds() {
return getEventHubProperties().flatMapMany(properties -> Flux.fromIterable(properties.getPartitionIds()));
}
Expand All @@ -194,6 +195,7 @@ public Mono<PartitionProperties> getPartitionProperties(String partitionId) {
*
* @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<EventDataBatch> createBatch() {
return createBatch(DEFAULT_BATCH_OPTIONS);
}
Expand All @@ -205,6 +207,7 @@ public Mono<EventDataBatch> createBatch() {
* @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
* @throws NullPointerException if {@code options} is null.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<EventDataBatch> createBatch(CreateBatchOptions options) {
if (options == null) {
return monoError(logger, new NullPointerException("'options' cannot be null."));
Expand Down Expand Up @@ -312,6 +315,7 @@ Mono<Void> send(EventData event, SendOptions options) {
* @return A {@link Mono} that completes when all events are pushed to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> send(Iterable<EventData> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
Expand All @@ -338,6 +342,7 @@ public Mono<Void> send(Iterable<EventData> events) {
* @return A {@link Mono} that completes when all events are pushed to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> send(Iterable<EventData> events, SendOptions options) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
Expand Down Expand Up @@ -392,6 +397,7 @@ Mono<Void> send(Flux<EventData> events, SendOptions options) {
* @see EventHubProducerAsyncClient#createBatch()
* @see EventHubProducerAsyncClient#createBatch(CreateBatchOptions)
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> send(EventDataBatch batch) {
if (batch == null) {
return monoError(logger, new NullPointerException("'batch' cannot be null."));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public EventHubProperties getEventHubProperties() {
*
* @return A Flux of identifiers for the partitions of an Event Hub.
*/
@ServiceMethod(returns = ReturnType.COLLECTION)
public IterableStream<String> getPartitionIds() {
return new IterableStream<>(producer.getPartitionIds());
}
Expand All @@ -122,6 +123,7 @@ public PartitionProperties getPartitionProperties(String partitionId) {
*
* @return A new {@link EventDataBatch} that can fit as many events as the transport allows.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch() {
return producer.createBatch().block(tryTimeout);
}
Expand All @@ -135,6 +137,7 @@ public EventDataBatch createBatch() {
*
* @throws NullPointerException if {@code options} is null.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public EventDataBatch createBatch(CreateBatchOptions options) {
return producer.createBatch(options).block(tryTimeout);
}
Expand All @@ -151,6 +154,7 @@ public EventDataBatch createBatch(CreateBatchOptions options) {
*
* @param event Event to send to the service.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
void send(EventData event) {
producer.send(event).block();
}
Expand All @@ -168,6 +172,7 @@ void send(EventData event) {
* @param event Event to send to the service.
* @param options The set of options to consider when sending this event.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
void send(EventData event, SendOptions options) {
producer.send(event, options).block();
}
Expand All @@ -188,6 +193,7 @@ void send(EventData event, SendOptions options) {
* @param events Events to send to the service.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void send(Iterable<EventData> events) {
producer.send(events).block();
}
Expand All @@ -209,6 +215,7 @@ public void send(Iterable<EventData> events) {
* @param options The set of options to consider when sending this batch.
* @throws AmqpException if the size of {@code events} exceed the maximum size of a single batch.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void send(Iterable<EventData> events, SendOptions options) {
producer.send(events, options).block();
}
Expand All @@ -221,6 +228,7 @@ public void send(Iterable<EventData> events, SendOptions options) {
* @see EventHubProducerClient#createBatch()
* @see EventHubProducerClient#createBatch(CreateBatchOptions)
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public void send(EventDataBatch batch) {
producer.send(batch).block();
}
Expand Down