diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml
index 969b301ee7bd..4a1822f1e1fe 100755
--- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml
+++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml
@@ -2751,4 +2751,11 @@
+
+
+
+
+
+
+
diff --git a/eng/common/pipelines/templates/steps/sparse-checkout.yml b/eng/common/pipelines/templates/steps/sparse-checkout.yml
index 49f9eb553b04..a3b553b3a7a7 100644
--- a/eng/common/pipelines/templates/steps/sparse-checkout.yml
+++ b/eng/common/pipelines/templates/steps/sparse-checkout.yml
@@ -44,8 +44,10 @@ steps:
Write-Host "git sparse-checkout init"
git sparse-checkout init
- Write-Host "git sparse-checkout set '/*' '!/*/' '/eng'"
- git sparse-checkout set '/*' '!/*/' '/eng'
+ # Set non-cone mode otherwise path filters will not work in git >= 2.37.0
+ # See https://github.blog/2022-06-27-highlights-from-git-2-37/#tidbits
+ Write-Host "git sparse-checkout set --no-cone '/*' '!/*/' '/eng'"
+ git sparse-checkout set --no-cone '/*' '!/*/' '/eng'
}
# Prevent wildcard expansion in Invoke-Expression (e.g. for checkout path '/*')
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java
new file mode 100644
index 000000000000..da0448cfc2f6
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java
@@ -0,0 +1,282 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpErrorContext;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions;
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscription;
+import reactor.core.CoreSubscriber;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxOperator;
+import reactor.core.publisher.Operators;
+import reactor.core.publisher.Sinks;
+
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
+
+/**
+ * Aggregates {@link EventData} into {@link EventDataBatch} and pushes them downstream when:
+ *
+ *
+ * - {@link BufferedProducerClientOptions#getMaxWaitTime()} elapses between events.
+ * - {@link EventDataBatch} cannot hold any more events.
+ *
+ */
+class EventDataAggregator extends FluxOperator {
+ private static final ClientLogger LOGGER = new ClientLogger(EventDataAggregator.class);
+ private final AtomicReference downstreamSubscription = new AtomicReference<>();
+ private final Supplier batchSupplier;
+ private final String namespace;
+ private final BufferedProducerClientOptions options;
+
+ /**
+ * Build a {@link FluxOperator} wrapper around the passed parent {@link Publisher}
+ *
+ * @param source the {@link Publisher} to decorate
+ */
+ EventDataAggregator(Flux extends EventData> source, Supplier batchSupplier,
+ String namespace, BufferedProducerClientOptions options) {
+ super(source);
+
+ this.batchSupplier = batchSupplier;
+ this.namespace = namespace;
+ this.options = options;
+ }
+
+ /**
+ * Subscribes to events from this operator. Downstream subscribers invoke this method and subscribe to events from
+ * it.
+ *
+ * @param actual Downstream subscriber.
+ */
+ @Override
+ public void subscribe(CoreSubscriber super EventDataBatch> actual) {
+ final EventDataAggregatorMain subscription = new EventDataAggregatorMain(actual, namespace, options,
+ batchSupplier, LOGGER);
+
+ if (!downstreamSubscription.compareAndSet(null, subscription)) {
+ throw LOGGER.logThrowableAsError(new IllegalArgumentException(
+ "Cannot resubscribe to multiple upstreams."));
+ }
+
+ source.subscribe(subscription);
+ }
+
+ /**
+ * Main implementation class for subscribing to the upstream source and publishing events downstream.
+ */
+ static class EventDataAggregatorMain implements Subscription, CoreSubscriber {
+ /**
+ * The number of requested EventDataBatches.
+ */
+ private volatile long requested;
+ private static final AtomicLongFieldUpdater REQUESTED =
+ AtomicLongFieldUpdater.newUpdater(EventDataAggregatorMain.class, "requested");
+
+ private static final Duration MAX_TIME = Duration.ofMillis(Long.MAX_VALUE);
+
+ private final Sinks.Many eventSink;
+ private final Disposable disposable;
+
+ private final AtomicBoolean isCompleted = new AtomicBoolean(false);
+ private final CoreSubscriber super EventDataBatch> downstream;
+ private final ClientLogger logger;
+ private final Supplier batchSupplier;
+ private final String namespace;
+ private final Object lock = new Object();
+
+ private Subscription subscription;
+ private EventDataBatch currentBatch;
+
+ EventDataAggregatorMain(CoreSubscriber super EventDataBatch> downstream, String namespace,
+ BufferedProducerClientOptions options, Supplier batchSupplier, ClientLogger logger) {
+ this.namespace = namespace;
+ this.downstream = downstream;
+ this.logger = logger;
+ this.batchSupplier = batchSupplier;
+ this.currentBatch = batchSupplier.get();
+
+ this.eventSink = Sinks.many().unicast().onBackpressureError();
+ this.disposable = eventSink.asFlux()
+ .switchMap(value -> {
+ if (value == 0) {
+ return Flux.interval(MAX_TIME, MAX_TIME);
+ } else {
+ return Flux.interval(options.getMaxWaitTime(), options.getMaxWaitTime());
+ }
+ })
+ .subscribe(next -> {
+ logger.verbose("Time elapsed. Publishing batch.");
+ updateOrPublishBatch(null, true);
+ });
+ }
+
+ /**
+ * Request a number of {@link EventDataBatch}.
+ *
+ * @param n Number of batches requested.
+ */
+ @Override
+ public void request(long n) {
+ if (!Operators.validate(n)) {
+ return;
+ }
+
+ Operators.addCap(REQUESTED, this, n);
+ subscription.request(n);
+ }
+
+ /**
+ * Cancels the subscription upstream.
+ */
+ @Override
+ public void cancel() {
+ // Do not keep requesting more events upstream
+ subscription.cancel();
+
+ updateOrPublishBatch(null, true);
+ downstream.onComplete();
+ disposable.dispose();
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ if (subscription != null) {
+ logger.warning("Subscription was already set. Cancelling existing subscription.");
+ subscription.cancel();
+ } else {
+ subscription = s;
+ downstream.onSubscribe(this);
+ }
+ }
+
+ @Override
+ public void onNext(EventData eventData) {
+ updateOrPublishBatch(eventData, false);
+
+ eventSink.emitNext(1L, Sinks.EmitFailureHandler.FAIL_FAST);
+
+ // When an EventDataBatch is pushed downstream, we decrement REQUESTED. However, if REQUESTED is still > 0,
+ // that means we did not publish the EventDataBatch (ie. because it was not full). We request another
+ // EventData upstream to try and fill this EventDataBatch and push it downstream.
+ final long left = REQUESTED.get(this);
+ if (left > 0) {
+ subscription.request(1L);
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (!isCompleted.compareAndSet(false, true)) {
+ Operators.onErrorDropped(t, downstream.currentContext());
+ return;
+ }
+
+ updateOrPublishBatch(null, true);
+ downstream.onError(t);
+ }
+
+ /**
+ * Upstream signals a completion.
+ */
+ @Override
+ public void onComplete() {
+ if (isCompleted.compareAndSet(false, true)) {
+ updateOrPublishBatch(null, true);
+ downstream.onComplete();
+ }
+ }
+
+ /**
+ * @param eventData EventData to add to or null if there are no events to add to the batch.
+ * @param alwaysPublish {@code true} to always push batch downstream. {@code false}, otherwise.
+ */
+ private void updateOrPublishBatch(EventData eventData, boolean alwaysPublish) {
+ if (alwaysPublish) {
+ publishDownstream();
+ return;
+ } else if (eventData == null) {
+ // EventData will be null in the case when options.maxWaitTime() has elapsed and we want to push the
+ // batch downstream.
+ return;
+ }
+
+ boolean added;
+ synchronized (lock) {
+ added = currentBatch.tryAdd(eventData);
+
+ if (added) {
+ return;
+ }
+
+ publishDownstream();
+ added = currentBatch.tryAdd(eventData);
+ }
+
+ if (!added) {
+ final AmqpException error = new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
+ "EventData exceeded maximum size.", new AmqpErrorContext(namespace));
+
+ onError(error);
+ }
+ }
+
+ /**
+ * Publishes batch downstream if there are events in the batch and updates it.
+ */
+ private void publishDownstream() {
+ EventDataBatch previous = null;
+
+ try {
+ synchronized (lock) {
+ previous = this.currentBatch;
+
+ if (previous == null) {
+ logger.warning("Batch should not be null, setting a new batch.");
+
+ this.currentBatch = batchSupplier.get();
+ return;
+ } else if (previous.getEvents().isEmpty()) {
+ return;
+ }
+
+ downstream.onNext(previous);
+
+ final long batchesLeft = REQUESTED.updateAndGet(this, (v) -> {
+ if (v == Long.MAX_VALUE) {
+ return v;
+ } else {
+ return v - 1;
+ }
+ });
+
+ logger.verbose(previous + ": Batch published. Requested batches left: {}", batchesLeft);
+
+ if (!isCompleted.get()) {
+ this.currentBatch = batchSupplier.get();
+ } else {
+ logger.verbose("Aggregator is completed. Not setting another batch.");
+ this.currentBatch = null;
+ }
+ }
+ } catch (Throwable e) {
+ final Throwable error = Operators.onNextError(previous, e, downstream.currentContext(), subscription);
+
+ logger.warning("Unable to push batch downstream to publish.", error);
+
+ if (error != null) {
+ onError(error);
+ }
+ }
+ }
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java
new file mode 100644
index 000000000000..2ed75c9cb933
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java
@@ -0,0 +1,202 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.exception.AmqpErrorContext;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
+import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
+import org.reactivestreams.Subscription;
+import reactor.core.Disposable;
+import reactor.core.publisher.BaseSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+import reactor.core.scheduler.Schedulers;
+import reactor.util.concurrent.Queues;
+import reactor.util.retry.Retry;
+
+import java.io.Closeable;
+import java.util.Queue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+/**
+ * Keeps track of publishing events to a partition.
+ */
+class EventHubBufferedPartitionProducer implements Closeable {
+ private final ClientLogger logger;
+ private final EventHubProducerAsyncClient client;
+ private final String partitionId;
+ private final AmqpErrorContext errorContext;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+ private final Disposable publishSubscription;
+ private final Sinks.Many eventSink;
+ private final Retry retryWhenPolicy = Retry.from(signal -> {
+ if (isClosed.get()) {
+ return Mono.empty();
+ } else {
+ return Mono.just(true);
+ }
+ });
+ private final CreateBatchOptions createBatchOptions;
+ private final Queue eventQueue;
+
+ EventHubBufferedPartitionProducer(EventHubProducerAsyncClient client, String partitionId,
+ BufferedProducerClientOptions options) {
+ this.client = client;
+ this.partitionId = partitionId;
+ this.errorContext = new AmqpErrorContext(client.getFullyQualifiedNamespace());
+ this.createBatchOptions = new CreateBatchOptions().setPartitionId(partitionId);
+
+ this.logger = new ClientLogger(EventHubBufferedPartitionProducer.class + "-" + partitionId);
+
+ final Supplier> queueSupplier = Queues.get(options.getMaxEventBufferLengthPerPartition());
+ this.eventQueue = queueSupplier.get();
+ this.eventSink = Sinks.many().unicast().onBackpressureBuffer(eventQueue);
+
+ final Flux eventDataBatchFlux = Flux.defer(() -> {
+ return new EventDataAggregator(eventSink.asFlux(),
+ () -> {
+ final Mono batch = client.createBatch(createBatchOptions);
+ try {
+ return batch.toFuture().get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException("Unexpected exception when getting batch.", e);
+ }
+ },
+ client.getFullyQualifiedNamespace(), options);
+ }).retryWhen(retryWhenPolicy);
+
+ final PublishResultSubscriber publishResultSubscriber = new PublishResultSubscriber(partitionId,
+ options.getSendSucceededContext(), options.getSendFailedContext(), logger);
+
+ this.publishSubscription = publishEvents(eventDataBatchFlux)
+ .publishOn(Schedulers.boundedElastic(), 1)
+ .subscribeWith(publishResultSubscriber);
+ }
+
+ Mono enqueueEvent(EventData eventData) {
+ return Mono.create(sink -> {
+ sink.onRequest(request -> {
+ try {
+ eventSink.emitNext(eventData, (signalType, emitResult) -> {
+ // If the draining queue is slower than the publishing queue.
+ return emitResult == Sinks.EmitResult.FAIL_OVERFLOW;
+ });
+ sink.success();
+ } catch (Exception e) {
+ sink.error(new AmqpException(false, "Unable to buffer message for partition: " + getPartitionId(), errorContext));
+ }
+ });
+ });
+ }
+
+ /**
+ * Gets the partition id that this producer is publishing events to.
+ *
+ * @return The partition id that this producer is publishing events to.
+ */
+ String getPartitionId() {
+ return partitionId;
+ }
+
+ /**
+ * Gets the number of events in queue.
+ *
+ * @return the number of events in the queue.
+ */
+ int getBufferedEventCount() {
+ return eventQueue.size();
+ }
+
+ @Override
+ public void close() {
+ if (isClosed.getAndSet(true)) {
+ return;
+ }
+
+ publishSubscription.dispose();
+ client.close();
+ }
+
+ /**
+ * Publishes {@link EventDataBatch} and returns the result.
+ *
+ * @return A stream of published results.
+ */
+ private Flux publishEvents(Flux upstream) {
+
+ return Flux.defer(() -> {
+ return upstream.flatMap(batch -> {
+ return client.send(batch).thenReturn(new PublishResult(batch, null))
+ // Resuming on error because an error is a terminal signal, so we want to wrap that with a result,
+ // so it doesn't stop publishing.
+ .onErrorResume(error -> Mono.just(new PublishResult(batch, error)));
+ }, 1, 1);
+ }).retryWhen(retryWhenPolicy);
+ }
+
+ /**
+ * Static class to hold results.
+ */
+ private static class PublishResult {
+ private final EventDataBatch batch;
+ private final Throwable error;
+
+ PublishResult(EventDataBatch batch, Throwable error) {
+ this.batch = batch;
+ this.error = error;
+ }
+ }
+
+ private static class PublishResultSubscriber extends BaseSubscriber {
+ private static final long REQUEST = 1L;
+ private final String partitionId;
+ private final Consumer onSucceed;
+ private final Consumer onFailed;
+ private final ClientLogger logger;
+
+ PublishResultSubscriber(String partitionId, Consumer onSucceed,
+ Consumer onFailed, ClientLogger logger) {
+ this.partitionId = partitionId;
+ this.onSucceed = onSucceed;
+ this.onFailed = onFailed;
+ this.logger = logger;
+ }
+
+ @Override
+ protected void hookOnSubscribe(Subscription subscription) {
+ upstream().request(REQUEST);
+ }
+
+ @Override
+ protected void hookOnNext(PublishResult result) {
+ if (result.error == null) {
+ onSucceed.accept(new SendBatchSucceededContext(result.batch.getEvents(), partitionId));
+ } else {
+ onFailed.accept(new SendBatchFailedContext(result.batch.getEvents(), partitionId, result.error));
+ }
+
+ // Request one more PublishResult, which is equivalent to asking for another batch.
+ upstream().request(REQUEST);
+ }
+
+ @Override
+ protected void hookOnError(Throwable throwable) {
+ logger.error("Publishing subscription completed and ended in an error.", throwable);
+ onFailed.accept(new SendBatchFailedContext(null, partitionId, throwable));
+ }
+
+ @Override
+ protected void hookOnComplete() {
+ logger.info("Publishing subscription completed.");
+ }
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java
index 3e137a0e3c62..a6c00ea851b5 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java
@@ -15,8 +15,7 @@
import java.io.Closeable;
import java.time.Duration;
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
/**
@@ -53,31 +52,35 @@
@ServiceClient(builder = EventHubBufferedProducerClientBuilder.class, isAsync = true)
public final class EventHubBufferedProducerAsyncClient implements Closeable {
private final ClientLogger logger = new ClientLogger(EventHubBufferedProducerAsyncClient.class);
- private final EventHubAsyncClient client;
+ private final EventHubProducerAsyncClient client;
private final EventHubClientBuilder builder;
private final BufferedProducerClientOptions clientOptions;
+ private final Mono initialisationMono;
// Key: partitionId.
- private final HashMap> partitionBatchMap = new HashMap<>();
- private final Mono initialisationMono;
+ private final ConcurrentHashMap partitionProducers =
+ new ConcurrentHashMap<>();
EventHubBufferedProducerAsyncClient(EventHubClientBuilder builder, BufferedProducerClientOptions clientOptions) {
this.builder = builder;
- this.client = builder.buildAsyncClient();
+ this.client = builder.buildAsyncProducerClient();
this.clientOptions = clientOptions;
- initialisationMono = Mono.using(
+ this.initialisationMono = Mono.using(
() -> builder.buildAsyncClient(),
- eventHubClient -> eventHubClient.getPartitionIds()
- .handle((partitionId, sink) -> {
- try {
- partitionBatchMap.put(partitionId, new ConcurrentLinkedDeque<>());
- sink.complete();
- } catch (Exception e) {
- sink.error(e);
- }
- }).then(),
- eventHubClient -> eventHubClient.close());
+ eventHubClient -> {
+ return eventHubClient.getPartitionIds()
+ .handle((partitionId, sink) -> {
+ try {
+ partitionProducers.put(partitionId, new EventHubBufferedPartitionProducer(client,
+ partitionId, clientOptions));
+ sink.complete();
+ } catch (Exception e) {
+ sink.error(e);
+ }
+ }).then();
+ },
+ eventHubClient -> eventHubClient.close()).cache();
}
/**
@@ -106,7 +109,7 @@ public String getEventHubName() {
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono getEventHubProperties() {
- return client.getProperties();
+ return client.getEventHubProperties();
}
/**
@@ -141,7 +144,10 @@ public Mono getPartitionProperties(String partitionId) {
* partitions.
*/
public int getBufferedEventCount() {
- return 0;
+ return partitionProducers.values()
+ .parallelStream()
+ .mapToInt(producer -> producer.getBufferedEventCount())
+ .sum();
}
/**
@@ -152,7 +158,9 @@ public int getBufferedEventCount() {
* @return The number of events that are buffered and waiting to be published for a given partition.
*/
public int getBufferedEventCount(String partitionId) {
- return 0;
+ final EventHubBufferedPartitionProducer producer = partitionProducers.get(partitionId);
+
+ return producer != null ? producer.getBufferedEventCount() : 0;
}
/**
@@ -255,7 +263,7 @@ static class BufferedProducerClientOptions {
private boolean enableIdempotentRetries = false;
private int maxConcurrentSendsPerPartition = 1;
- private int maxPendingEventCount = 1500;
+ private int maxEventBufferLengthPerPartition = 1500;
private Duration maxWaitTime;
private Consumer sendFailedContext;
private Consumer sendSucceededContext;
@@ -285,12 +293,12 @@ void setMaxConcurrentSendsPerPartition(int maxConcurrentSendsPerPartition) {
this.maxConcurrentSendsPerPartition = maxConcurrentSendsPerPartition;
}
- int getMaxPendingEventCount() {
- return maxPendingEventCount;
+ int getMaxEventBufferLengthPerPartition() {
+ return maxEventBufferLengthPerPartition;
}
- void setMaxPendingEventCount(int maxPendingEventCount) {
- this.maxPendingEventCount = maxPendingEventCount;
+ void maxEventBufferLengthPerPartition(int maxPendingEventCount) {
+ this.maxEventBufferLengthPerPartition = maxPendingEventCount;
}
Duration getMaxWaitTime() {
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java
index 0c7dfeba650a..4fcf0e163fdc 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java
@@ -271,7 +271,7 @@ public EventHubBufferedProducerClientBuilder maxConcurrentSendsPerPartition(int
* @return The updated {@link EventHubBufferedProducerClientBuilder} object.
*/
public EventHubBufferedProducerClientBuilder maxEventBufferLengthPerPartition(int maxEventBufferLengthPerPartition) {
- clientOptions.setMaxPendingEventCount(maxEventBufferLengthPerPartition);
+ clientOptions.maxEventBufferLengthPerPartition(maxEventBufferLengthPerPartition);
return this;
}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java
index 5947504c73a0..04a327bfcd77 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java
@@ -12,7 +12,7 @@
import java.util.concurrent.atomic.AtomicInteger;
/**
- * Allows events to be resolved to partitions using common patterns such as round-robin assignment and hashing of
+ * Allows events to be resolved to a partition using common patterns such as round-robin assignment and hashing of
* partitions keys.
*/
class PartitionResolver {
@@ -93,24 +93,25 @@ static short generateHashCode(String partitionKey) {
* This implementation is a direct port of the Event Hubs service code; it is intended to match the gateway hashing
* algorithm as closely as possible and should not be adjusted without careful consideration.
*
+ * NOTE: Suppressing fallthrough warning for switch-case because we want it to fall into the subsequent cases.
+ *
* @param data The data to base the hash on.
* @param seed1 Seed value for the first hash.
* @param seed2 Seed value for the second hash.
*
* @return An object containing the computed hash for {@code seed1} and {@code seed2}.
*/
+ @SuppressWarnings("fallthrough")
private static Hashed computeHash(byte[] data, int seed1, int seed2) {
- int a;
- int b;
- int c;
-
- a = b = c = (0xdeadbeef + data.length + seed1);
- c += seed2;
+ int a = (0xdeadbeef + data.length + seed1);
+ int b = a;
+ int c = a + seed2;
final ByteBuffer buffer = ByteBuffer.allocate(data.length)
- .put(data)
- .flip()
- .order(ByteOrder.LITTLE_ENDIAN);
+ .put(data);
+
+ buffer.flip();
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
int index = 0;
int size = data.length;
@@ -154,34 +155,50 @@ private static Hashed computeHash(byte[] data, int seed1, int seed2) {
b += buffer.getInt(index + 4);
c += buffer.getInt(index + 8);
break;
+
+ // fallthrough
case 11:
c += data[index + 10] << 16;
+ // fallthrough
case 10:
c += data[index + 9] << 8;
+ // fallthrough
case 9:
c += data[index + 8];
+ // fallthrough
case 8:
b += buffer.getInt(index + 4);
a += buffer.getInt(index);
break;
+
+ // fallthrough
case 7:
b += data[index + 6] << 16;
+ // fallthrough
case 6:
b += data[index + 5] << 8;
+ // fallthrough
case 5:
b += data[index + 4];
+ // fallthrough
case 4:
a += buffer.getInt(index);
break;
+
+ // fallthrough
case 3:
a += data[index + 2] << 16;
+ // fallthrough
case 2:
a += data[index + 1] << 8;
+ // fallthrough
case 1:
a += data[index];
break;
case 0:
return new Hashed(c, b);
+ default:
+ break;
}
c ^= b;
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java
index 82c3c4748029..6c731d2bb858 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java
@@ -30,7 +30,7 @@ public class PartitionResolverTests {
public static Stream> partitionSetTestCases() {
final ArrayList> arguments = new ArrayList<>();
- for (var index = 1; index < 8; ++index) {
+ for (int index = 1; index < 8; ++index) {
final List partitions = IntStream.range(0, index).mapToObj(String::valueOf)
.collect(Collectors.toList());
@@ -73,6 +73,11 @@ public static Stream partitionHashTestCases() {
return arguments.stream();
}
+ /**
+ * Tests that events are distributed equally given the set of partitions.
+ *
+ * @param partitionsList List of partitions to distribute all events between.
+ */
@ParameterizedTest
@MethodSource("partitionSetTestCases")
public void distributesRoundRobinFairly(List partitionsList) {
@@ -106,7 +111,7 @@ public void distributesRoundRobinFairlyConcurrent(List partitionsList) {
// Create a function that assigns partitions in a loop and track them.
Mono roundRobin = Mono.fromRunnable(() -> {
- for (var index = 0; index < iterationCount; index++) {
+ for (int index = 0; index < iterationCount; index++) {
assigned.add(resolver.assignRoundRobin(partitions));
}
});
@@ -148,6 +153,8 @@ public void distributesRoundRobinFairlyConcurrent(List partitionsList) {
/**
* Verifies that the same partition key is assigned to the same partition id.
+ *
+ * @param partitionsList List of partitions.
*/
@ParameterizedTest
@MethodSource("partitionSetTestCases")
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java
new file mode 100644
index 000000000000..60f03efaa9e3
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java
@@ -0,0 +1,319 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import reactor.test.StepVerifier;
+import reactor.test.publisher.TestPublisher;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+public class EventDataAggregatorTest {
+ private static final String NAMESPACE = "test.namespace";
+
+ private AutoCloseable mockCloseable;
+
+ @Mock
+ private EventDataBatch batch;
+
+ @Mock
+ private EventDataBatch batch2;
+
+ @Mock
+ private EventDataBatch batch3;
+
+ private final EventData event1 = new EventData("foo");
+ private final EventData event2 = new EventData("bar");
+ private final EventData event3 = new EventData("baz");
+
+ @BeforeEach
+ public void beforeEach() {
+ mockCloseable = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ if (mockCloseable != null) {
+ mockCloseable.close();
+ }
+
+ Mockito.framework().clearInlineMock(this);
+ }
+
+ /**
+ * Tests that it pushes full batches downstream (when tryAdd returns fall). Also, publishes any batches downstream
+ * when upstream completes.
+ */
+ @Test
+ public void pushesFullBatchesDownstream() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event3);
+
+ final Duration waitTime = Duration.ofSeconds(30);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicInteger first = new AtomicInteger(0);
+ final Supplier supplier = () -> {
+ final int current = first.getAndIncrement();
+
+ switch (current) {
+ case 0:
+ return batch;
+ case 1:
+ return batch2;
+ default:
+ throw new RuntimeException("pushesFullBatchesDownstream: Did not expect to get this many invocations.");
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options);
+
+ // Act & Assert
+ StepVerifier.create(aggregator)
+ .then(() -> {
+ publisher.next(event1, event2, event3);
+ })
+ .expectNext(batch)
+ .then(() -> publisher.complete())
+ .expectNext(batch2)
+ .expectComplete()
+ .verify(Duration.ofSeconds(10));
+ }
+
+ /**
+ * Tests that it pushes partial batches downstream when an error occurs.
+ */
+ @Test
+ public void pushesBatchesAndError() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final Duration waitTime = Duration.ofSeconds(30);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicBoolean first = new AtomicBoolean();
+ final Supplier supplier = () -> {
+ if (first.compareAndSet(false, true)) {
+ return batch;
+ } else {
+ return batch2;
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options);
+ final IllegalArgumentException testException = new IllegalArgumentException("Test exception.");
+
+ // Act & Assert
+ StepVerifier.create(aggregator)
+ .then(() -> {
+ publisher.next(event1, event2);
+ publisher.error(testException);
+ })
+ .expectNext(batch)
+ .expectErrorMatches(e -> e.equals(testException))
+ .verify(Duration.ofSeconds(10));
+
+ // Verify that these events were added to the batch.
+ assertEquals(2, batchEvents.size());
+ assertTrue(batchEvents.contains(event1));
+ assertTrue(batchEvents.contains(event2));
+ }
+
+ /**
+ * Tests that batches are pushed downstream when max wait time has elapsed.
+ */
+ @Test
+ public void pushesBatchAfterMaxTime() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event3);
+
+ final Duration waitTime = Duration.ofSeconds(5);
+ final Duration halfWaitTime = waitTime.minusSeconds(2);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicInteger first = new AtomicInteger(0);
+ final Supplier supplier = () -> {
+ final int current = first.getAndIncrement();
+
+ switch (current) {
+ case 0:
+ return batch;
+ case 1:
+ return batch2;
+ default:
+ System.out.println("Invoked get batch for the xth time:" + current);
+ return batch3;
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options);
+
+ // Act & Assert
+ StepVerifier.create(aggregator)
+ .then(() -> publisher.next(event1))
+ .thenAwait(halfWaitTime)
+ .then(() -> {
+ assertEquals(1, batchEvents.size());
+
+ publisher.next(event2);
+ })
+ .thenAwait(waitTime)
+ .assertNext(b -> {
+ assertEquals(b, batch);
+ assertEquals(2, batchEvents.size());
+ })
+ .expectNoEvent(waitTime)
+ .then(() -> publisher.next(event3))
+ .thenAwait(waitTime)
+ .assertNext(e -> {
+ assertEquals(e, batch2, "Should be equal.");
+ })
+ .thenCancel()
+ .verify();
+ }
+
+ /**
+ * Verifies that an error is propagated when it is too large for the link.
+ */
+ @Test
+ public void errorsOnEventThatDoesNotFit() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents);
+
+ final Duration waitTime = Duration.ofSeconds(30);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicBoolean first = new AtomicBoolean();
+ final Supplier supplier = () -> {
+ if (first.compareAndSet(false, true)) {
+ return batch;
+ } else {
+ throw new IllegalArgumentException("Did not expect another batch call.");
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options);
+
+ StepVerifier.create(aggregator)
+ .then(() -> {
+ publisher.next(event1);
+ })
+ .consumeErrorWith(error -> {
+ assertTrue(error instanceof AmqpException);
+ assertEquals(AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, ((AmqpException) error).getErrorCondition());
+ })
+ .verify(Duration.ofSeconds(20));
+ }
+
+ /**
+ * Verifies that backpressure requests are supported.
+ */
+ @Test
+ public void respectsBackpressure() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2);
+
+ final Duration waitTime = Duration.ofSeconds(3);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicInteger first = new AtomicInteger(0);
+ final Supplier supplier = () -> {
+ final int current = first.getAndIncrement();
+
+ switch (current) {
+ case 0:
+ return batch;
+ case 1:
+ return batch2;
+ default:
+ throw new RuntimeException("respectsBackpressure: Did not expect to get this many invocations.");
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options);
+
+ final long request = 1L;
+
+ StepVerifier.create(aggregator, request)
+ .then(() -> publisher.next(event1))
+ .assertNext(b -> {
+ assertEquals(1, b.getCount());
+ assertTrue(b.getEvents().contains(event1));
+ })
+ .expectNoEvent(waitTime)
+ .thenCancel()
+ .verify();
+
+ publisher.assertMaxRequested(request);
+ }
+
+ /**
+ * Helper method to set up mocked {@link EventDataBatch} to accept the given events in {@code resultSet}.
+ *
+ * @param batch Mocked batch.
+ * @param resultSet List to store added EventData in.
+ * @param acceptedEvents EventData that is accepted by the batch when {@link EventDataBatch#tryAdd(EventData)}
+ * is invoked.
+ */
+ static void setupBatchMock(EventDataBatch batch, List resultSet, EventData... acceptedEvents) {
+ when(batch.tryAdd(any(EventData.class))).thenAnswer(invocation -> {
+ final EventData arg = invocation.getArgument(0);
+
+ final boolean matches = Arrays.asList(acceptedEvents).contains(arg);
+
+ if (matches) {
+ resultSet.add(arg);
+ }
+
+ return matches;
+ });
+ when(batch.getEvents()).thenAnswer(invocation -> {
+ return resultSet;
+ });
+ when(batch.getCount()).thenAnswer(invocation -> resultSet.size());
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java
new file mode 100644
index 000000000000..72546d4e0f8f
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java
@@ -0,0 +1,355 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
+import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static com.azure.messaging.eventhubs.EventDataAggregatorTest.setupBatchMock;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link EventHubBufferedPartitionProducer}
+ */
+public class EventHubBufferedPartitionProducerTest {
+ private static final String PARTITION_ID = "10";
+ private static final String NAMESPACE = "test-eventhubs-namespace";
+ private static final String EVENT_HUB_NAME = "test-hub";
+ private static final List PARTITION_IDS = Arrays.asList("one", "two", PARTITION_ID, "four");
+
+ private AutoCloseable mockCloseable;
+
+ private final Semaphore successSemaphore = new Semaphore(1);
+ private final Semaphore failedSemaphore = new Semaphore(1);
+ private final EventData event1 = new EventData("foo");
+ private final EventData event2 = new EventData("bar");
+ private final EventData event3 = new EventData("baz");
+ private final EventData event4 = new EventData("bart");
+
+ private final Queue returnedBatches = new LinkedList<>();
+
+ @Mock
+ private EventHubProducerAsyncClient client;
+
+ @Mock
+ private EventDataBatch batch;
+
+ @Mock
+ private EventDataBatch batch2;
+
+ @Mock
+ private EventDataBatch batch3;
+
+ @Mock
+ private EventDataBatch batch4;
+
+ @Mock
+ private EventDataBatch batch5;
+
+ @BeforeEach
+ public void beforeEach() {
+ mockCloseable = MockitoAnnotations.openMocks(this);
+
+ returnedBatches.add(batch);
+ returnedBatches.add(batch2);
+ returnedBatches.add(batch3);
+ returnedBatches.add(batch4);
+ returnedBatches.add(batch5);
+
+ when(client.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
+ when(client.getEventHubName()).thenReturn(EVENT_HUB_NAME);
+ when(client.getPartitionIds()).thenReturn(Flux.fromIterable(PARTITION_IDS));
+
+ when(client.createBatch(any(CreateBatchOptions.class))).thenAnswer(invocation -> {
+ final EventDataBatch returned = returnedBatches.poll();
+ assertNotNull(returned, "there should be more batches to be returned.");
+ return Mono.just(returned);
+ });
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ if (mockCloseable != null) {
+ mockCloseable.close();
+ }
+
+ Mockito.framework().clearInlineMock(this);
+ }
+
+ @Test
+ public void publishesEvents() throws InterruptedException {
+ // Arrange
+ successSemaphore.acquire();
+ failedSemaphore.acquire();
+
+ final InvocationHolder holder = new InvocationHolder();
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(Duration.ofSeconds(5));
+ options.setSendSucceededContext(holder::onSucceed);
+ options.setSendFailedContext(holder::onFailed);
+
+ final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime());
+
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty());
+
+ final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID,
+ options);
+
+ // Act & Assert
+ StepVerifier.create(producer.enqueueEvent(event1))
+ .verifyComplete();
+
+ StepVerifier.create(producer.enqueueEvent(event2))
+ .verifyComplete();
+
+ assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful batch pushed downstream.");
+
+ assertEquals(1, holder.succeededContexts.size());
+
+ final SendBatchSucceededContext first = holder.succeededContexts.get(0);
+ assertEquals(PARTITION_ID, first.getPartitionId());
+ assertEquals(batchEvents, first.getEvents());
+
+ assertEquals(2, batchEvents.size());
+
+ assertTrue(holder.failedContexts.isEmpty());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void publishesErrors() throws InterruptedException {
+ // Arrange
+ successSemaphore.acquire();
+ failedSemaphore.acquire();
+
+ final InvocationHolder holder = new InvocationHolder();
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(Duration.ofSeconds(5));
+ options.setSendSucceededContext(holder::onSucceed);
+ options.setSendFailedContext(holder::onFailed);
+
+ final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime());
+
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event3, event4);
+
+ final Throwable error = new IllegalStateException("test-options.");
+ when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty(), Mono.error(error));
+
+ final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID,
+ options);
+
+ // Act & Assert
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2)))
+ .verifyComplete();
+
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event3)))
+ .thenAwait(options.getMaxWaitTime())
+ .verifyComplete();
+
+ assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful signal downstream.");
+
+ assertTrue(failedSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful error downstream.");
+
+ assertEquals(1, holder.succeededContexts.size());
+
+ final SendBatchSucceededContext first = holder.succeededContexts.get(0);
+ assertEquals(PARTITION_ID, first.getPartitionId());
+ assertEquals(batchEvents, first.getEvents());
+
+ assertEquals(2, batchEvents.size());
+
+ assertEquals(1, holder.failedContexts.size());
+ }
+
+ /**
+ * Checks that after an error publishing one batch, it can still publish subsequent batches successfully.
+ */
+ @Test
+ public void canPublishAfterErrors() throws InterruptedException {
+ // Arrange
+ final CountDownLatch success = new CountDownLatch(2);
+ failedSemaphore.acquire();
+
+ final InvocationHolder holder = new InvocationHolder();
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(Duration.ofSeconds(5));
+ options.setSendSucceededContext(context -> {
+ holder.onSucceed(context);
+ success.countDown();
+ });
+ options.setSendFailedContext(context -> holder.onFailed(context));
+
+ final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime());
+
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event3);
+
+ final List batchEvents3 = new ArrayList<>();
+ final EventData event5 = new EventData("five");
+ setupBatchMock(batch3, batchEvents3, event4, event5);
+
+ final Throwable error = new IllegalStateException("test-options.");
+
+ final Queue> responses = new LinkedList<>();
+ responses.add(Mono.empty());
+ responses.add(Mono.error(error));
+ responses.add(Mono.empty());
+ when(client.send(any(EventDataBatch.class))).thenAnswer(invocation -> {
+ return responses.poll();
+ });
+
+ final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID,
+ options);
+
+ // Act & Assert
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2)))
+ .verifyComplete();
+
+ StepVerifier.create(producer.enqueueEvent(event3))
+ .thenAwait(options.getMaxWaitTime())
+ .verifyComplete();
+
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event4), producer.enqueueEvent(event5)))
+ .thenAwait(options.getMaxWaitTime())
+ .verifyComplete();
+
+ assertTrue(success.await(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful signal downstream.");
+
+ assertTrue(failedSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful error downstream.");
+
+ assertEquals(2, holder.succeededContexts.size());
+
+ // Verify the completed ones.
+ final SendBatchSucceededContext first = holder.succeededContexts.get(0);
+ assertEquals(PARTITION_ID, first.getPartitionId());
+ assertEquals(batchEvents, first.getEvents());
+
+ final SendBatchSucceededContext second = holder.succeededContexts.get(1);
+ assertEquals(PARTITION_ID, second.getPartitionId());
+ assertEquals(batchEvents3, second.getEvents());
+
+ // Verify the failed ones.
+ assertEquals(1, holder.failedContexts.size());
+ }
+
+ @Test
+ public void getBufferedEventCounts() throws InterruptedException {
+ // Arrange
+ final CountDownLatch success = new CountDownLatch(2);
+ failedSemaphore.acquire();
+
+ final InvocationHolder holder = new InvocationHolder();
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(Duration.ofSeconds(3));
+ options.setSendSucceededContext(context -> {
+ System.out.println("Batch received.");
+ holder.onSucceed(context);
+ success.countDown();
+ });
+ options.setSendFailedContext(context -> holder.onFailed(context));
+
+ final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime());
+
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event2, event3);
+
+ final List batchEvents3 = new ArrayList<>();
+ final EventData event5 = new EventData("five");
+ setupBatchMock(batch3, batchEvents3, event4, event5);
+
+ // Delaying send operation.
+ when(client.send(any(EventDataBatch.class))).thenAnswer(invocation -> Mono.delay(options.getMaxWaitTime()).then());
+
+ final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID,
+ options);
+
+ // Act & Assert
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2), producer.enqueueEvent(event3)), 1L)
+ .then(() -> {
+ // event1 was enqueued, event2 is in a batch, and event3 is currently in the queue waiting to be
+ // pushed downstream.
+ // batch1 (with event1) is being sent at the moment with the delay of options.getMaxWaitTime(), so the
+ // buffer doesn't drain so quickly.
+ final int bufferedEventCount = producer.getBufferedEventCount();
+ assertEquals(1, bufferedEventCount);
+ })
+ .verifyComplete();
+
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event4), producer.enqueueEvent(event5)))
+ .verifyComplete();
+
+ final long totalTime = waitTime.toMillis() + waitTime.toMillis();
+ assertTrue(success.await(totalTime, TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful signal downstream.");
+
+ assertEquals(2, holder.succeededContexts.size());
+
+ // Verify the completed ones.
+ final SendBatchSucceededContext first = holder.succeededContexts.get(0);
+ assertEquals(PARTITION_ID, first.getPartitionId());
+ assertEquals(batchEvents, first.getEvents());
+
+ final SendBatchSucceededContext second = holder.succeededContexts.get(1);
+ assertEquals(PARTITION_ID, second.getPartitionId());
+ assertEquals(batchEvents2, second.getEvents());
+ }
+
+ private class InvocationHolder {
+ private final List succeededContexts = new ArrayList<>();
+ private final List failedContexts = new ArrayList<>();
+
+ void onSucceed(SendBatchSucceededContext result) {
+ succeededContexts.add(result);
+ successSemaphore.release();
+ }
+
+ void onFailed(SendBatchFailedContext result) {
+ failedContexts.add(result);
+ failedSemaphore.release();
+ }
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 000000000000..1f0955d450f0
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline