events, SendOptions options) {
+ return client.enqueueEvents(events, options).block(operationTimeout);
+ }
+
+ /**
+ * Attempts to publish all events in the buffer immediately. This may result in multiple batches being published,
+ * the outcome of each of which will be individually reported by the {@link EventHubBufferedProducerClientBuilder#onSendBatchFailed(Consumer)}
+ * and {@link EventHubBufferedProducerClientBuilder#onSendBatchSucceeded(Consumer)} handlers.
+ *
+ * Upon completion of this method, the buffer will be empty.
+ */
+ public void flush() {
+ client.flush().block(operationTimeout.plus(operationTimeout));
+ }
+
+ /**
+ * Disposes of the producer and all its resources.
+ */
+ @Override
+ public void close() {
+ client.close();
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java
new file mode 100644
index 000000000000..b1f431c4ccea
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java
@@ -0,0 +1,428 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.core.amqp.AmqpTransportType;
+import com.azure.core.amqp.ProxyOptions;
+import com.azure.core.annotation.ServiceClientBuilder;
+import com.azure.core.annotation.ServiceClientProtocol;
+import com.azure.core.credential.AzureNamedKeyCredential;
+import com.azure.core.credential.AzureSasCredential;
+import com.azure.core.credential.TokenCredential;
+import com.azure.core.exception.AzureException;
+import com.azure.core.util.ClientOptions;
+import com.azure.core.util.Configuration;
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
+import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
+
+import java.net.URL;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+import static com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions;
+
+/**
+ * Builder used to instantiate {@link EventHubBufferedProducerClient} and {@link EventHubBufferedProducerAsyncClient}.
+ *
+ * @see EventHubBufferedProducerClient
+ * @see EventHubBufferedProducerAsyncClient
+ */
+@ServiceClientBuilder(
+ serviceClients = {EventHubBufferedProducerAsyncClient.class, EventHubBufferedProducerClient.class},
+ protocol = ServiceClientProtocol.AMQP)
+public final class EventHubBufferedProducerClientBuilder {
+ private static final ClientLogger LOGGER = new ClientLogger(EventHubBufferedProducerClientBuilder.class);
+
+ private final EventHubClientBuilder builder;
+ private final BufferedProducerClientOptions clientOptions = new BufferedProducerClientOptions();
+ private final PartitionResolver partitionResolver = new PartitionResolver();
+ private AmqpRetryOptions retryOptions;
+
+ /**
+ * Creates a new instance with the following defaults:
+ *
+ * - {@link #maxEventBufferLengthPerPartition(int)} is 1500
+ * - {@link #transportType(AmqpTransportType)} is {@link AmqpTransportType#AMQP}
+ * - {@link #maxConcurrentSendsPerPartition(int)} is 1
+ * - {@link #maxConcurrentSends(int)} is 1
+ * - {@link #maxWaitTime(Duration)} is 30 seconds
+ *
+ */
+ public EventHubBufferedProducerClientBuilder() {
+ builder = new EventHubClientBuilder();
+ }
+
+ /**
+ * Sets the client options.
+ *
+ * @param clientOptions The client options.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder clientOptions(ClientOptions clientOptions) {
+ builder.clientOptions(clientOptions);
+ return this;
+ }
+
+ /**
+ * Sets the configuration store that is used during construction of the service client.
+ *
+ * If not specified, the default configuration store is used to configure the buffered producer. Use {@link
+ * Configuration#NONE} to bypass using configuration settings during construction.
+ *
+ * @param configuration The configuration store used to configure the buffered producer.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder configuration(Configuration configuration) {
+ builder.configuration(configuration);
+ return this;
+ }
+
+ /**
+ * Sets the credential information given a connection string to the Event Hub instance.
+ *
+ *
+ * If the connection string is copied from the Event Hubs namespace, it will likely not contain the name to the
+ * desired Event Hub, which is needed. In this case, the name can be added manually by adding {@literal
+ * "EntityPath=EVENT_HUB_NAME"} to the end of the connection string. For example, "EntityPath=telemetry-hub".
+ *
+ *
+ *
+ * If you have defined a shared access policy directly on the Event Hub itself, then copying the connection string
+ * from that Event Hub will result in a connection string that contains the name.
+ *
+ *
+ * @param connectionString The connection string to use for connecting to the Event Hub instance. It is expected
+ * that the Event Hub name and the shared access key properties are contained in this connection string.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ *
+ * @throws IllegalArgumentException if {@code connectionString} is null or empty. Or, the {@code
+ * connectionString} does not contain the "EntityPath" key, which is the name of the Event Hub instance.
+ * @throws AzureException If the shared access signature token credential could not be created using the
+ * connection string.
+ */
+ public EventHubBufferedProducerClientBuilder connectionString(String connectionString) {
+ builder.connectionString(connectionString);
+ return this;
+ }
+
+ /**
+ * Sets the credential information given a connection string to the Event Hubs namespace and name to a specific
+ * Event Hub instance.
+ *
+ * @param connectionString The connection string to use for connecting to the Event Hubs namespace; it is
+ * expected that the shared access key properties are contained in this connection string, but not the Event Hub
+ * name.
+ * @param eventHubName The name of the Event Hub to connect the client to.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ *
+ * @throws NullPointerException if {@code connectionString} or {@code eventHubName} is null.
+ * @throws IllegalArgumentException if {@code connectionString} or {@code eventHubName} is an empty string. Or,
+ * if the {@code connectionString} contains the Event Hub name.
+ * @throws AzureException If the shared access signature token credential could not be created using the
+ * connection string.
+ */
+ public EventHubBufferedProducerClientBuilder connectionString(String connectionString, String eventHubName) {
+ builder.connectionString(connectionString, eventHubName);
+ return this;
+ }
+
+ /**
+ * Sets the credential information for which Event Hub instance to connect to, and how to authorize against it.
+ *
+ * @param fullyQualifiedNamespace The fully qualified name for the Event Hubs namespace. This is likely to be
+ * similar to {@literal "{your-namespace}.servicebus.windows.net}".
+ * @param eventHubName The name of the Event Hub to connect the client to.
+ * @param credential The token credential to use for authorization. Access controls may be specified by the
+ * Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ *
+ * @throws IllegalArgumentException if {@code fullyQualifiedNamespace} or {@code eventHubName} is an empty
+ * string.
+ * @throws NullPointerException if {@code fullyQualifiedNamespace}, {@code eventHubName}, or {@code credential}
+ * is null.
+ */
+ public EventHubBufferedProducerClientBuilder credential(String fullyQualifiedNamespace, String eventHubName,
+ TokenCredential credential) {
+ builder.credential(fullyQualifiedNamespace, eventHubName, credential);
+ return this;
+ }
+
+ /**
+ * Sets the {@link TokenCredential} used to authorize requests sent to the service. Refer to the Azure SDK for Java
+ * identity and authentication documentation for more details
+ * on proper usage of the {@link TokenCredential} type.
+ *
+ * @param fullyQualifiedNamespace The fully qualified name for the Event Hubs namespace. This is likely to be
+ * similar to {@literal "{your-namespace}.servicebus.windows.net}".
+ * @param eventHubName The name of the Event Hub to connect the client to.
+ * @param credential The token credential to use for authorization. Access controls may be specified by the
+ * Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ *
+ * @throws NullPointerException if {@code fullyQualifiedNamespace}, {@code eventHubName}, or {@code credential}
+ * is null.
+ */
+ public EventHubBufferedProducerClientBuilder credential(String fullyQualifiedNamespace, String eventHubName,
+ AzureNamedKeyCredential credential) {
+ builder.credential(fullyQualifiedNamespace, eventHubName, credential);
+ return this;
+ }
+
+ /**
+ * Sets the {@link TokenCredential} used to authorize requests sent to the service. Refer to the Azure SDK for Java
+ * identity and authentication documentation for more details
+ * on proper usage of the {@link TokenCredential} type.
+ *
+ * @param fullyQualifiedNamespace The fully qualified name for the Event Hubs namespace. This is likely to be
+ * similar to {@literal "{your-namespace}.servicebus.windows.net}".
+ * @param eventHubName The name of the Event Hub to connect the client to.
+ * @param credential The token credential to use for authorization. Access controls may be specified by the
+ * Event Hubs namespace or the requested Event Hub, depending on Azure configuration.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ *
+ * @throws NullPointerException if {@code fullyQualifiedNamespace}, {@code eventHubName}, or {@code credential}
+ * is null.
+ */
+ public EventHubBufferedProducerClientBuilder credential(String fullyQualifiedNamespace, String eventHubName,
+ AzureSasCredential credential) {
+ builder.credential(fullyQualifiedNamespace, eventHubName, credential);
+ return this;
+ }
+
+ /**
+ * Sets a custom endpoint address when connecting to the Event Hubs service. This can be useful when your network
+ * does not allow connecting to the standard Azure Event Hubs endpoint address, but does allow connecting through an
+ * intermediary. For example: {@literal https://my.custom.endpoint.com:55300}.
+ *
+ * If no port is specified, the default port for the {@link #transportType(AmqpTransportType) transport type} is
+ * used.
+ *
+ * @param customEndpointAddress The custom endpoint address.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ *
+ * @throws IllegalArgumentException if {@code customEndpointAddress} cannot be parsed into a valid {@link URL}.
+ */
+ public EventHubBufferedProducerClientBuilder customEndpointAddress(String customEndpointAddress) {
+ builder.customEndpointAddress(customEndpointAddress);
+ return this;
+ }
+
+ /**
+ * Indicates whether events should be published using idempotent semantics for retries. If enabled, retries during
+ * publishing will attempt to avoid duplication with a minor cost to throughput. Duplicates are still possible but
+ * the chance of them occurring is much lower when idempotent retries are enabled.
+ *
+ *
+ * It is important to note that enabling idempotent retries does not guarantee exactly-once semantics. The existing
+ * Event Hubs at-least-once delivery contract still applies and event duplication is unlikely, but possible.
+ *
+ *
+ * @param enableIdempotentRetries {@code true} to enable idempotent retries, {@code false} otherwise.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ EventHubBufferedProducerClientBuilder enableIdempotentRetries(boolean enableIdempotentRetries) {
+ clientOptions.setEnableIdempotentRetries(enableIdempotentRetries);
+ return this;
+ }
+
+ /**
+ * The total number of batches that may be sent concurrently, across all partitions. This limit takes precedence
+ * over the value specified in {@link #maxConcurrentSendsPerPartition(int) maxConcurrentSendsPerPartition}, ensuring
+ * this maximum is respected. When batches for the same partition are published concurrently, the ordering of
+ * events is not guaranteed. If the order events are published must be maintained, {@link
+ * #maxConcurrentSendsPerPartition(int) maxConcurrentSendsPerPartition} should not exceed 1.
+ *
+ *
+ * By default, this will be set to the number of processors available in the host environment.
+ *
+ *
+ * @param maxConcurrentSends The total number of batches that may be sent concurrently.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ EventHubBufferedProducerClientBuilder maxConcurrentSends(int maxConcurrentSends) {
+ clientOptions.setMaxConcurrentSends(maxConcurrentSends);
+ return this;
+ }
+
+ /**
+ * The number of batches that may be sent concurrently for a given partition. This option is superseded by the
+ * value specified for {@link #maxConcurrentSends(int) maxConcurrrentSends}, ensuring that limit is respected.
+ *
+ * @param maxConcurrentSendsPerPartition The number of batches that may be sent concurrently for a given
+ * partition.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ EventHubBufferedProducerClientBuilder maxConcurrentSendsPerPartition(int maxConcurrentSendsPerPartition) {
+ clientOptions.setMaxConcurrentSendsPerPartition(maxConcurrentSendsPerPartition);
+ return this;
+ }
+
+ /**
+ * The total number of events that can be buffered for publishing at a given time for a given partition. Once this
+ * capacity is reached, more events can enqueued by calling the {@code enqueueEvent} methods on either {@link
+ * EventHubBufferedProducerClient} or {@link EventHubBufferedProducerAsyncClient}.
+ *
+ * The default limit is 1500 queued events for each partition.
+ *
+ * @param maxEventBufferLengthPerPartition Total number of events that can be buffered for publishing at a given
+ * time.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder maxEventBufferLengthPerPartition(int maxEventBufferLengthPerPartition) {
+ clientOptions.maxEventBufferLengthPerPartition(maxEventBufferLengthPerPartition);
+ return this;
+ }
+
+ /**
+ * The amount of time to wait for a batch to be built with events in the buffer before publishing a partially full
+ * batch.
+ *
+ * @param maxWaitTime The amount of time to wait.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder maxWaitTime(Duration maxWaitTime) {
+ clientOptions.setMaxWaitTime(maxWaitTime);
+ return this;
+ }
+
+ /**
+ * The callback to invoke when publishing a set of events fails.
+ *
+ * @param sendFailedContext The callback to invoke.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder onSendBatchFailed(
+ Consumer sendFailedContext) {
+ clientOptions.setSendFailedContext(sendFailedContext);
+ return this;
+ }
+
+ /**
+ * The callback to invoke when publishing a set of events succeeds.
+ *
+ * @param sendSucceededContext The callback to invoke when publishing a ste of events succeeds.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder onSendBatchSucceeded(
+ Consumer sendSucceededContext) {
+ clientOptions.setSendSucceededContext(sendSucceededContext);
+ return this;
+ }
+
+ /**
+ * Sets the proxy configuration to use for the buffered producer. When a proxy is configured, {@link
+ * AmqpTransportType#AMQP_WEB_SOCKETS} must be used for the transport type.
+ *
+ * @param proxyOptions The proxy configuration to use.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder proxyOptions(ProxyOptions proxyOptions) {
+ builder.proxyOptions(proxyOptions);
+ return this;
+ }
+
+ /**
+ * Sets the retry policy for the producer client. If not specified, the default retry options are used.
+ *
+ * @param retryOptions The retry policy to use.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder retryOptions(AmqpRetryOptions retryOptions) {
+ this.retryOptions = retryOptions;
+ builder.retryOptions(retryOptions);
+ return this;
+ }
+
+ /**
+ * Sets the transport type by which all the communication with Azure Event Hubs occurs. Default value is {@link
+ * AmqpTransportType#AMQP}.
+ *
+ * @param transport The transport type to use.
+ *
+ * @return The updated {@link EventHubBufferedProducerClientBuilder} object.
+ */
+ public EventHubBufferedProducerClientBuilder transportType(AmqpTransportType transport) {
+ builder.transportType(transport);
+ return this;
+ }
+
+ /**
+ * Builds a new instance of the async buffered producer client.
+ *
+ * @return A new instance of {@link EventHubBufferedProducerAsyncClient}.
+ *
+ * @throws NullPointerException if {@link #onSendBatchSucceeded(Consumer)}, {@link
+ * #onSendBatchFailed(Consumer)}, or {@link #maxWaitTime(Duration)} are null.
+ * @throws IllegalArgumentException if {@link #maxConcurrentSends(int)}, {@link
+ * #maxConcurrentSendsPerPartition(int)}, or {@link #maxEventBufferLengthPerPartition(int)} are less than 1.
+ */
+ public EventHubBufferedProducerAsyncClient buildAsyncClient() {
+
+ if (Objects.isNull(clientOptions.getSendSucceededContext())) {
+ throw LOGGER.logExceptionAsError(new NullPointerException("'onSendBatchSucceeded' cannot be null."));
+ }
+
+ if (Objects.isNull(clientOptions.getSendFailedContext())) {
+ throw LOGGER.logExceptionAsError(new NullPointerException("'onSendBatchFailed' cannot be null."));
+ }
+
+ if (Objects.isNull(clientOptions.getMaxWaitTime())) {
+ throw LOGGER.logExceptionAsError(new NullPointerException("'maxWaitTime' cannot be null."));
+ }
+
+ if (clientOptions.getMaxEventBufferLengthPerPartition() < 1) {
+ throw LOGGER.logExceptionAsError(new IllegalArgumentException(
+ "'maxEventBufferLengthPerPartition' cannot be less than 1."));
+ }
+
+ if (clientOptions.getMaxConcurrentSends() < 1) {
+ throw LOGGER.logExceptionAsError(new IllegalArgumentException(
+ "'maxConcurrentSends' cannot be less than 1."));
+ }
+
+ if (clientOptions.getMaxConcurrentSendsPerPartition() < 1) {
+ throw LOGGER.logExceptionAsError(new IllegalArgumentException(
+ "'maxConcurrentSendsPerPartition' cannot be less than 1."));
+ }
+
+ final AmqpRetryOptions options = retryOptions == null
+ ? EventHubClientBuilder.DEFAULT_RETRY
+ : retryOptions;
+
+ return new EventHubBufferedProducerAsyncClient(builder, clientOptions, partitionResolver, options);
+ }
+
+ /**
+ * Builds a new instance of the buffered producer client.
+ *
+ * @return A new instance of {@link EventHubBufferedProducerClient}.
+ */
+ public EventHubBufferedProducerClient buildClient() {
+ final AmqpRetryOptions options = retryOptions == null
+ ? EventHubClientBuilder.DEFAULT_RETRY
+ : retryOptions;
+
+ return new EventHubBufferedProducerClient(buildAsyncClient(), options.getTryTimeout());
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
index 55cd67314eb2..6fdb45e78ce2 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java
@@ -158,6 +158,9 @@ public class EventHubClientBuilder implements
// So, limit the prefetch to just 1 by default.
static final int DEFAULT_PREFETCH_COUNT_FOR_SYNC_CLIENT = 1;
+ static final AmqpRetryOptions DEFAULT_RETRY = new AmqpRetryOptions()
+ .setTryTimeout(ClientConstants.OPERATION_TIMEOUT);
+
/**
* The name of the default consumer group in the Event Hubs service.
*/
@@ -177,8 +180,6 @@ public class EventHubClientBuilder implements
private static final String UNKNOWN = "UNKNOWN";
private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING";
- private static final AmqpRetryOptions DEFAULT_RETRY = new AmqpRetryOptions()
- .setTryTimeout(ClientConstants.OPERATION_TIMEOUT);
private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+");
private static final ClientLogger LOGGER = new ClientLogger(EventHubClientBuilder.class);
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java
new file mode 100644
index 000000000000..04a327bfcd77
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java
@@ -0,0 +1,248 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.util.logging.ClientLogger;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Allows events to be resolved to a partition using common patterns such as round-robin assignment and hashing of
+ * partitions keys.
+ */
+class PartitionResolver {
+ private static final ClientLogger LOGGER = new ClientLogger(PartitionResolver.class);
+ private static final int STARTING_INDEX = -1;
+
+ private final AtomicInteger partitionAssignmentIndex = new AtomicInteger(STARTING_INDEX);
+
+ /**
+ * Assigns a partition using a round-robin approach.
+ *
+ * @param partitions The set of available partitions.
+ *
+ * @return The zero-based index of the selected partition from the available set.
+ */
+ String assignRoundRobin(String[] partitions) {
+ Objects.requireNonNull(partitions, "'partitions' cannot be null.");
+
+ if (partitions.length == 0) {
+ throw LOGGER.logExceptionAsError(new IllegalArgumentException("'partitions' cannot be empty."));
+ }
+
+ final int currentIndex = partitionAssignmentIndex.accumulateAndGet(1,
+ (current, added) -> {
+ try {
+ return Math.addExact(current, added);
+ } catch (ArithmeticException e) {
+ LOGGER.info("Overflowed incrementing index. Rolling over.", e);
+
+ return STARTING_INDEX + added;
+ }
+ });
+
+ return partitions[(currentIndex % partitions.length)];
+ }
+
+ /**
+ * Assigns a partition using a hash-based approach based on the provided {@code partitionKey}.
+ *
+ * @param partitionKey The partition key.
+ * @param partitions The set of available partitions.
+ *
+ * @return The zero-based index of the selected partition from the available set.
+ */
+ String assignForPartitionKey(String partitionKey, String[] partitions) {
+ final short hashValue = generateHashCode(partitionKey);
+ final int index = Math.abs(hashValue % partitions.length);
+
+ return partitions[index];
+ }
+
+ /**
+ * Generates a hashcode for the partition key using Jenkins' lookup3 algorithm.
+ *
+ * This implementation is a direct port of the Event Hubs service code; it is intended to match the gateway hashing
+ * algorithm as closely as possible and should not be adjusted without careful consideration.
+ *
+ * @param partitionKey The partition key.
+ *
+ * @return The generated hash code.
+ */
+ static short generateHashCode(String partitionKey) {
+ if (partitionKey == null) {
+ return 0;
+ }
+
+ final byte[] bytes = partitionKey.getBytes(StandardCharsets.UTF_8);
+
+ final Hashed hashed = computeHash(bytes, 0, 0);
+ final int i = hashed.getHash1() ^ hashed.getHash2();
+
+ return Integer.valueOf(i).shortValue();
+ }
+
+ /**
+ * Computes a hash value using Jenkins' lookup3 algorithm.
+ *
+ * This implementation is a direct port of the Event Hubs service code; it is intended to match the gateway hashing
+ * algorithm as closely as possible and should not be adjusted without careful consideration.
+ *
+ * NOTE: Suppressing fallthrough warning for switch-case because we want it to fall into the subsequent cases.
+ *
+ * @param data The data to base the hash on.
+ * @param seed1 Seed value for the first hash.
+ * @param seed2 Seed value for the second hash.
+ *
+ * @return An object containing the computed hash for {@code seed1} and {@code seed2}.
+ */
+ @SuppressWarnings("fallthrough")
+ private static Hashed computeHash(byte[] data, int seed1, int seed2) {
+ int a = (0xdeadbeef + data.length + seed1);
+ int b = a;
+ int c = a + seed2;
+
+ final ByteBuffer buffer = ByteBuffer.allocate(data.length)
+ .put(data);
+
+ buffer.flip();
+ buffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ int index = 0;
+ int size = data.length;
+
+ while (size > 12) {
+ a += buffer.getInt(index);
+ b += buffer.getInt(index + 4);
+ c += buffer.getInt(index + 8);
+
+ a -= c;
+ a ^= (c << 4) | (c >>> 28);
+ c += b;
+
+ b -= a;
+ b ^= (a << 6) | (a >>> 26);
+ a += c;
+
+ c -= b;
+ c ^= (b << 8) | (b >>> 24);
+ b += a;
+
+ a -= c;
+ a ^= (c << 16) | (c >>> 16);
+ c += b;
+
+ b -= a;
+ b ^= (a << 19) | (a >>> 13);
+ a += c;
+
+ c -= b;
+ c ^= (b << 4) | (b >>> 28);
+ b += a;
+
+ index += 12;
+ size -= 12;
+ }
+
+ switch (size) {
+ case 12:
+ a += buffer.getInt(index);
+ b += buffer.getInt(index + 4);
+ c += buffer.getInt(index + 8);
+ break;
+
+ // fallthrough
+ case 11:
+ c += data[index + 10] << 16;
+ // fallthrough
+ case 10:
+ c += data[index + 9] << 8;
+ // fallthrough
+ case 9:
+ c += data[index + 8];
+ // fallthrough
+ case 8:
+ b += buffer.getInt(index + 4);
+ a += buffer.getInt(index);
+ break;
+
+ // fallthrough
+ case 7:
+ b += data[index + 6] << 16;
+ // fallthrough
+ case 6:
+ b += data[index + 5] << 8;
+ // fallthrough
+ case 5:
+ b += data[index + 4];
+ // fallthrough
+ case 4:
+ a += buffer.getInt(index);
+ break;
+
+ // fallthrough
+ case 3:
+ a += data[index + 2] << 16;
+ // fallthrough
+ case 2:
+ a += data[index + 1] << 8;
+ // fallthrough
+ case 1:
+ a += data[index];
+ break;
+ case 0:
+ return new Hashed(c, b);
+ default:
+ break;
+ }
+
+ c ^= b;
+ c -= (b << 14) | (b >>> 18);
+
+ a ^= c;
+ a -= (c << 11) | (c >>> 21);
+
+ b ^= a;
+ b -= (a << 25) | (a >>> 7);
+
+ c ^= b;
+ c -= (b << 16) | (b >>> 16);
+
+ a ^= c;
+ a -= (c << 4) | (c >>> 28);
+
+ b ^= a;
+ b -= (a << 14) | (a >>> 18);
+
+ c ^= b;
+ c -= (b << 24) | (b >>> 8);
+
+ return new Hashed(c, b);
+ }
+
+ /**
+ * Class that holds the hash values from the lookup algorithm.
+ */
+ private static class Hashed {
+ private final int hash1;
+ private final int hash2;
+
+ Hashed(int hash1, int hash2) {
+ this.hash1 = hash1;
+ this.hash2 = hash2;
+ }
+
+ public int getHash1() {
+ return hash1;
+ }
+
+ public int getHash2() {
+ return hash2;
+ }
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java
index 80d8ba4bee7b..c18499b906bb 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java
@@ -20,6 +20,7 @@ public final class ClientConstants {
public static final String ENTITY_PATH_KEY = "entityPath";
public static final String SIGNAL_TYPE_KEY = "signalType";
public static final String CLIENT_IDENTIFIER_KEY = "clientIdentifier";
+ public static final String EMIT_RESULT_KEY = "emitResult";
// EventHubs specific logging context keys
public static final String PARTITION_ID_KEY = "partitionId";
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendBatchFailedContext.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendBatchFailedContext.java
new file mode 100644
index 000000000000..f8af315312ff
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendBatchFailedContext.java
@@ -0,0 +1,61 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs.models;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerClient;
+
+/**
+ * Contains information about a batch that was unable to be published, as well as the exception that occurred and the
+ * partition that the batch was being published to.
+ *
+ * @see EventHubBufferedProducerClient
+ * @see EventHubBufferedProducerAsyncClient
+ */
+public final class SendBatchFailedContext {
+ private final Iterable events;
+ private final String partitionId;
+ private final Throwable throwable;
+
+ /**
+ * Creates a new instance.
+ *
+ * @param events Events associated with the failed batch.
+ * @param partitionId Partition that the events went to.
+ * @param throwable Error associated with the failed batch.
+ */
+ public SendBatchFailedContext(Iterable events, String partitionId, Throwable throwable) {
+ this.events = events;
+ this.partitionId = partitionId;
+ this.throwable = throwable;
+ }
+
+ /**
+ * Gets the events that failed to send.
+ *
+ * @return The events that failed to send.
+ */
+ public Iterable getEvents() {
+ return events;
+ }
+
+ /**
+ * Gets the partition id that the failed batch went to.
+ *
+ * @return The partition id that the failed batch went to.
+ */
+ public String getPartitionId() {
+ return partitionId;
+ }
+
+ /**
+ * Gets the error that occurred when sending the batch.
+ *
+ * @return The error that occurred when sending the batch.
+ */
+ public Throwable getThrowable() {
+ return throwable;
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendBatchSucceededContext.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendBatchSucceededContext.java
new file mode 100644
index 000000000000..5a853d465cba
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendBatchSucceededContext.java
@@ -0,0 +1,48 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs.models;
+
+import com.azure.messaging.eventhubs.EventData;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerClient;
+
+/**
+ * Contains information about a batch that was published and the partition that it was published to.
+ *
+ * @see EventHubBufferedProducerAsyncClient
+ * @see EventHubBufferedProducerClient
+ */
+public final class SendBatchSucceededContext {
+ private final String partitionId;
+ private final Iterable events;
+
+ /**
+ * Initializes a new instance of the class.
+ *
+ * @param events The set of events in the batch that was published.
+ * @param partitionId The identifier of the partition that the batch was published to.
+ */
+ public SendBatchSucceededContext(Iterable events, String partitionId) {
+ this.events = events;
+ this.partitionId = partitionId;
+ }
+
+ /**
+ * Gets the set of events in the batch that was published.
+ *
+ * @return The set of events in the batch that was published.
+ */
+ public Iterable getEvents() {
+ return events;
+ }
+
+ /**
+ * Gets the identifier of the partition that the batch was published to.
+ *
+ * @return The identifier of the partition that the batch was published to.
+ */
+ public String getPartitionId() {
+ return partitionId;
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java
new file mode 100644
index 000000000000..6c731d2bb858
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java
@@ -0,0 +1,233 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link PartitionResolver}. Taken from .NET's to confirm cross-language compatibility.
+ *
+ * @see PartitionResolverTests.cs
+ */
+public class PartitionResolverTests {
+ public static Stream> partitionSetTestCases() {
+ final ArrayList> arguments = new ArrayList<>();
+
+ for (int index = 1; index < 8; ++index) {
+ final List partitions = IntStream.range(0, index).mapToObj(String::valueOf)
+ .collect(Collectors.toList());
+
+ arguments.add(partitions);
+ }
+
+ // Build sets for 16, 32, and 2000 partitions for more extreme cases.
+
+ for (int count : new int[]{16, 32, 2000}) {
+ final List partitions = IntStream.range(0, count).mapToObj(String::valueOf)
+ .collect(Collectors.toList());
+
+ arguments.add(partitions);
+ }
+
+ return arguments.stream();
+ }
+
+ public static Stream partitionHashTestCases() {
+
+ final ArrayList arguments = new ArrayList<>();
+
+ arguments.add(Arguments.of("7", (short) -15263));
+ arguments.add(Arguments.of("131", (short) 30562));
+ arguments.add(Arguments.of("7149583486996073602", (short) 12977));
+ arguments.add(Arguments.of("FWfAT", (short) -22341));
+ arguments.add(Arguments.of("sOdeEAsyQoEuEFPGerWO", (short) -6503));
+ arguments.add(Arguments.of(
+ "FAyAIctPeCgmiwLKbJcyswoHglHVjQdvtBowLACDNORsYvOcLddNJYDmhAVkbyLOrHTKLneMNcbgWVlasVywOByANjs",
+ (short) 5226));
+ arguments.add(Arguments.of("1XYM6!(7(lF5wq4k4m*e$Nc!1ezLJv*1YK1Y-C^*&B$O)lq^iUkG(TNzXG;Zi#z2Og*Qq0#^*k)"
+ + ":vXh$3,C7We7%W0meJ;b3,rQCg^J;^twXgs5E$$hWKxqp", (short) 23950));
+ arguments.add(Arguments.of("E(x;RRIaQcJs*P;D&jTPau-4K04oqr:lF6Z):ERpo&;"
+ + "9040qyV@G1_c9mgOs-8_8/10Fwa-7b7-yP!T-!IH&968)FWuI;(^g$2fN;)HJ^^yTn:", (short) -29304));
+ arguments.add(Arguments.of("!c*_!I@1^c", (short) 15372));
+ arguments.add(Arguments.of("p4*!jioeO/z-!-;w:dh", (short) -3104));
+ arguments.add(Arguments.of("$0cb", (short) 26269));
+ arguments.add(Arguments.of("-4189260826195535198", (short) 453));
+
+ return arguments.stream();
+ }
+
+ /**
+ * Tests that events are distributed equally given the set of partitions.
+ *
+ * @param partitionsList List of partitions to distribute all events between.
+ */
+ @ParameterizedTest
+ @MethodSource("partitionSetTestCases")
+ public void distributesRoundRobinFairly(List partitionsList) {
+ // Arrange
+ final String[] partitions = partitionsList.toArray(new String[0]);
+ final PartitionResolver resolver = new PartitionResolver();
+
+ // Act & Assert
+ for (int i = 0; i < 100; i++) {
+ final String expected = partitions[i % partitions.length];
+ final String actual = resolver.assignRoundRobin(partitions);
+
+ assertEquals(expected, actual, "The assignment was unexpected for index: " + i);
+ }
+ }
+
+ @ParameterizedTest
+ @MethodSource("partitionSetTestCases")
+ public void distributesRoundRobinFairlyConcurrent(List partitionsList) {
+ // Arrange
+ final String[] partitions = partitionsList.toArray(new String[0]);
+
+ final int concurrentCount = 4;
+ final int assignmentsPerPartition = 20;
+ final int expectedAssignmentCount = (concurrentCount * assignmentsPerPartition);
+ final int iterationCount = partitions.length * assignmentsPerPartition;
+
+ final PartitionResolver resolver = new PartitionResolver();
+ final ArrayList assigned = new ArrayList<>();
+ final ArrayList> activeTasks = new ArrayList<>();
+
+ // Create a function that assigns partitions in a loop and track them.
+ Mono roundRobin = Mono.fromRunnable(() -> {
+ for (int index = 0; index < iterationCount; index++) {
+ assigned.add(resolver.assignRoundRobin(partitions));
+ }
+ });
+
+ // Create concurrent round-robin tasks.
+ IntStream.range(0, concurrentCount).forEach(index -> activeTasks.add(roundRobin));
+
+ // Assert
+ // Wait for them all to complete.
+ StepVerifier.create(Mono.when(activeTasks))
+ .verifyComplete();
+
+ // Assert
+
+ // When grouped, the count of each partition should equal the iteration count for each
+ // concurrent invocation.
+ final HashMap partitionAssignments = assigned.stream().collect(HashMap::new,
+ (map, value) -> map.compute(value, (key, existingValue) -> existingValue == null ? 1 : (existingValue + 1)),
+ (map1, map2) -> {
+ map2.forEach((key, value) -> {
+ map1.compute(key, (existingKey, existingValue) -> {
+ // It did not exist in map1, so we use the total from map2. Otherwise, combine the two.
+ return existingValue == null ? value : (existingValue + value);
+ });
+ });
+ });
+
+ // Verify that each assignment is for a valid partition and has the expected distribution.
+ partitionAssignments.forEach((partitionId, numberAssigned) -> {
+ assertEquals(expectedAssignmentCount, numberAssigned,
+ String.format("The count for key: [%s] should match the total iterations.", partitionId));
+ });
+
+ // Verify that all partitions were assigned.
+ for (String id : partitions) {
+ assertTrue(partitionAssignments.containsKey(id), "Partition " + id + " should have had an assignment.");
+ }
+ }
+
+ /**
+ * Verifies that the same partition key is assigned to the same partition id.
+ *
+ * @param partitionsList List of partitions.
+ */
+ @ParameterizedTest
+ @MethodSource("partitionSetTestCases")
+ public void partitionKeyAssignmentIsStable(List partitionsList) {
+ // Arrange
+ final String[] partitions = partitionsList.toArray(new String[0]);
+
+ final int iterationCount = 25;
+ final String key = "this-is-a-key-1";
+ final PartitionResolver resolver = new PartitionResolver();
+ final String expected = resolver.assignForPartitionKey(key, partitions);
+
+ // Act & Assert
+ IntStream.range(0, iterationCount).forEach(index -> {
+ final String actual = resolver.assignForPartitionKey(key, partitions);
+ assertEquals(expected, actual, "The assignment for iteration: [" + index + "] was unstable.");
+ });
+ }
+
+ @ParameterizedTest
+ @MethodSource("partitionSetTestCases")
+ public void partitionKeyAssignmentDistributesKeysToDifferentPartitions(List partitionsList) {
+ // Arrange
+ final String[] partitions = partitionsList.toArray(new String[0]);
+
+ final int keyLength = 20;
+ final int requiredAssignments = (int) Math.floor(partitions.length * 0.67);
+ final HashSet assignedHash = new HashSet<>();
+ final PartitionResolver resolver = new PartitionResolver();
+
+ // Create the random number generator using a constant seed; this is
+ // intended to allow for randomization but will also keep a consistent
+ // pattern each time the tests are run.
+ final Random random = new Random(412);
+
+ for (int index = 0; index < Integer.MAX_VALUE; index++) {
+ final StringBuilder keyBuilder = new StringBuilder(keyLength);
+
+ for (int charIndex = 0; charIndex < keyLength; charIndex++) {
+ keyBuilder.append((char) random.nextInt(256));
+ }
+
+ final String key = keyBuilder.toString();
+ final String partition = resolver.assignForPartitionKey(key, partitions);
+
+ assignedHash.add(partition);
+
+ // If keys were distributed to more than one partition and the minimum number of
+ // iterations was satisfied, break the loop.
+
+ if (assignedHash.size() > requiredAssignments) {
+ break;
+ }
+ }
+
+ assertTrue(assignedHash.size() >= requiredAssignments, String.format(
+ "Partition keys should have had some level of distribution among partitions. Assigned: %d. Required: %d",
+ assignedHash.size(), requiredAssignments));
+ }
+
+ /**
+ * Verifies functionality of hash code generation for the {@link PartitionResolver}.
+ *
+ * @param partitionKey Partition key
+ * @param expectedHash Expected value
+ */
+ @ParameterizedTest(name = "{index} Partition Key: {0}")
+ @MethodSource("partitionHashTestCases")
+ public void hashCodeAssignmentIsStable(String partitionKey, short expectedHash) {
+ // Act
+ short actual = PartitionResolver.generateHashCode(partitionKey);
+
+ // Assert
+ assertEquals(expectedHash, actual);
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java
new file mode 100644
index 000000000000..c50f9d4b94a6
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java
@@ -0,0 +1,310 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.exception.AmqpErrorCondition;
+import com.azure.core.amqp.exception.AmqpException;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import reactor.test.StepVerifier;
+import reactor.test.publisher.TestPublisher;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+public class EventDataAggregatorTest {
+ private static final String NAMESPACE = "test.namespace";
+ private static final String PARTITION_ID = "test-id";
+
+ private AutoCloseable mockCloseable;
+
+ @Mock
+ private EventDataBatch batch;
+
+ @Mock
+ private EventDataBatch batch2;
+
+ @Mock
+ private EventDataBatch batch3;
+
+ private final EventData event1 = new EventData("foo");
+ private final EventData event2 = new EventData("bar");
+ private final EventData event3 = new EventData("baz");
+
+ @BeforeEach
+ public void beforeEach() {
+ mockCloseable = MockitoAnnotations.openMocks(this);
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ if (mockCloseable != null) {
+ mockCloseable.close();
+ }
+
+ Mockito.framework().clearInlineMock(this);
+ }
+
+ /**
+ * Tests that it pushes full batches downstream (when tryAdd returns fall). Also, publishes any batches downstream
+ * when upstream completes.
+ */
+ @Test
+ public void pushesFullBatchesDownstream() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event3);
+
+ final Duration waitTime = Duration.ofSeconds(30);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicInteger first = new AtomicInteger(0);
+ final Supplier supplier = () -> {
+ final int current = first.getAndIncrement();
+
+ switch (current) {
+ case 0:
+ return batch;
+ case 1:
+ return batch2;
+ default:
+ throw new RuntimeException("pushesFullBatchesDownstream: Did not expect to get this many invocations.");
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID);
+
+ // Act & Assert
+ StepVerifier.create(aggregator)
+ .then(() -> {
+ publisher.next(event1, event2, event3);
+ })
+ .expectNext(batch)
+ .then(() -> publisher.complete())
+ .expectNext(batch2)
+ .expectComplete()
+ .verify(Duration.ofSeconds(10));
+ }
+
+ /**
+ * Tests that it pushes partial batches downstream when an error occurs.
+ */
+ @Test
+ public void pushesBatchesAndError() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final Duration waitTime = Duration.ofSeconds(30);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicBoolean first = new AtomicBoolean();
+ final Supplier supplier = () -> {
+ if (first.compareAndSet(false, true)) {
+ return batch;
+ } else {
+ return batch2;
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID);
+ final IllegalArgumentException testException = new IllegalArgumentException("Test exception.");
+
+ // Act & Assert
+ StepVerifier.create(aggregator)
+ .then(() -> {
+ publisher.next(event1, event2);
+ publisher.error(testException);
+ })
+ .expectNext(batch)
+ .expectErrorMatches(e -> e.equals(testException))
+ .verify(Duration.ofSeconds(10));
+
+ // Verify that these events were added to the batch.
+ assertEquals(2, batchEvents.size());
+ assertTrue(batchEvents.contains(event1));
+ assertTrue(batchEvents.contains(event2));
+ }
+
+ /**
+ * Tests that batches are pushed downstream when max wait time has elapsed.
+ */
+ @Test
+ public void pushesBatchAfterMaxTime() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event1, event2, event3);
+
+ final Duration waitTime = Duration.ofSeconds(5);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicInteger first = new AtomicInteger(0);
+ final Supplier supplier = () -> {
+ final int current = first.getAndIncrement();
+
+ switch (current) {
+ case 0:
+ return batch;
+ case 1:
+ return batch2;
+ default:
+ System.out.println("Invoked get batch for the xth time:" + current);
+ return batch3;
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID);
+
+ // Act & Assert
+ StepVerifier.create(aggregator)
+ .then(() -> {
+ publisher.next(event1);
+ publisher.next(event2);
+ })
+ .thenAwait(waitTime)
+ .assertNext(b -> {
+ assertEquals(b, batch);
+ assertEquals(2, batchEvents.size());
+ })
+ .thenCancel()
+ .verify();
+ }
+
+ /**
+ * Verifies that an error is propagated when it is too large for the link.
+ */
+ @Test
+ public void errorsOnEventThatDoesNotFit() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents);
+
+ final Duration waitTime = Duration.ofSeconds(30);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicBoolean first = new AtomicBoolean();
+ final Supplier supplier = () -> {
+ if (first.compareAndSet(false, true)) {
+ return batch;
+ } else {
+ throw new IllegalArgumentException("Did not expect another batch call.");
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID);
+
+ StepVerifier.create(aggregator)
+ .then(() -> {
+ publisher.next(event1);
+ })
+ .consumeErrorWith(error -> {
+ assertTrue(error instanceof AmqpException);
+ assertEquals(AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, ((AmqpException) error).getErrorCondition());
+ })
+ .verify(Duration.ofSeconds(20));
+ }
+
+ /**
+ * Verifies that backpressure requests are supported.
+ */
+ @Test
+ public void respectsBackpressure() {
+ // Arrange
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2);
+
+ final Duration waitTime = Duration.ofSeconds(3);
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(waitTime);
+
+ final AtomicInteger first = new AtomicInteger(0);
+ final Supplier supplier = () -> {
+ final int current = first.getAndIncrement();
+
+ switch (current) {
+ case 0:
+ return batch;
+ case 1:
+ return batch2;
+ default:
+ throw new RuntimeException("respectsBackpressure: Did not expect to get this many invocations.");
+ }
+ };
+
+ final TestPublisher publisher = TestPublisher.createCold();
+ final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID);
+
+ final long request = 1L;
+
+ StepVerifier.create(aggregator, request)
+ .then(() -> publisher.next(event1))
+ .assertNext(b -> {
+ assertEquals(1, b.getCount());
+ assertTrue(b.getEvents().contains(event1));
+ })
+ .expectNoEvent(waitTime)
+ .thenCancel()
+ .verify();
+
+ publisher.assertMaxRequested(request);
+ }
+
+ /**
+ * Helper method to set up mocked {@link EventDataBatch} to accept the given events in {@code resultSet}.
+ *
+ * @param batch Mocked batch.
+ * @param resultSet List to store added EventData in.
+ * @param acceptedEvents EventData that is accepted by the batch when {@link EventDataBatch#tryAdd(EventData)}
+ * is invoked.
+ */
+ static void setupBatchMock(EventDataBatch batch, List resultSet, EventData... acceptedEvents) {
+ when(batch.tryAdd(any(EventData.class))).thenAnswer(invocation -> {
+ final EventData arg = invocation.getArgument(0);
+
+ final boolean matches = Arrays.asList(acceptedEvents).contains(arg);
+
+ if (matches) {
+ resultSet.add(arg);
+ }
+
+ return matches;
+ });
+ when(batch.getEvents()).thenAnswer(invocation -> {
+ return resultSet;
+ });
+ when(batch.getCount()).thenAnswer(invocation -> resultSet.size());
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java
new file mode 100644
index 000000000000..b3c20b7d3f44
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java
@@ -0,0 +1,376 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.amqp.AmqpRetryOptions;
+import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions;
+import com.azure.messaging.eventhubs.models.CreateBatchOptions;
+import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
+import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Isolated;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import static com.azure.messaging.eventhubs.EventDataAggregatorTest.setupBatchMock;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests {@link EventHubBufferedPartitionProducer}
+ */
+@Isolated
+public class EventHubBufferedPartitionProducerTest {
+ private static final String PARTITION_ID = "10";
+ private static final String NAMESPACE = "test-eventhubs-namespace";
+ private static final String EVENT_HUB_NAME = "test-hub";
+ private static final List PARTITION_IDS = Arrays.asList("one", "two", PARTITION_ID, "four");
+ private static final AmqpRetryOptions DEFAULT_RETRY_OPTIONS = new AmqpRetryOptions();
+
+ private AutoCloseable mockCloseable;
+
+ private final Semaphore successSemaphore = new Semaphore(1);
+ private final Semaphore failedSemaphore = new Semaphore(1);
+ private final EventData event1 = new EventData("foo");
+ private final EventData event2 = new EventData("bar");
+ private final EventData event3 = new EventData("baz");
+ private final EventData event4 = new EventData("bart");
+
+ private final Queue returnedBatches = new LinkedList<>();
+
+ @Mock
+ private EventHubProducerAsyncClient client;
+
+ @Mock
+ private EventDataBatch batch;
+
+ @Mock
+ private EventDataBatch batch2;
+
+ @Mock
+ private EventDataBatch batch3;
+
+ @Mock
+ private EventDataBatch batch4;
+
+ @Mock
+ private EventDataBatch batch5;
+
+ @BeforeEach
+ public void beforeEach() {
+ mockCloseable = MockitoAnnotations.openMocks(this);
+
+ returnedBatches.add(batch);
+ returnedBatches.add(batch2);
+ returnedBatches.add(batch3);
+ returnedBatches.add(batch4);
+ returnedBatches.add(batch5);
+
+ when(client.getFullyQualifiedNamespace()).thenReturn(NAMESPACE);
+ when(client.getEventHubName()).thenReturn(EVENT_HUB_NAME);
+ when(client.getPartitionIds()).thenReturn(Flux.fromIterable(PARTITION_IDS));
+
+ when(client.createBatch(any(CreateBatchOptions.class))).thenAnswer(invocation -> {
+ final EventDataBatch returned = returnedBatches.poll();
+ assertNotNull(returned, "there should be more batches to be returned.");
+ return Mono.just(returned);
+ });
+ }
+
+ @AfterEach
+ public void afterEach() throws Exception {
+ if (mockCloseable != null) {
+ mockCloseable.close();
+ }
+
+ Mockito.framework().clearInlineMock(this);
+ }
+
+ @Test
+ public void publishesEvents() throws InterruptedException {
+ // Arrange
+ successSemaphore.acquire();
+ failedSemaphore.acquire();
+
+ final InvocationHolder holder = new InvocationHolder();
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(Duration.ofSeconds(5));
+ options.setSendSucceededContext(holder::onSucceed);
+ options.setSendFailedContext(holder::onFailed);
+
+ final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime());
+
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty());
+
+ final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID,
+ options, DEFAULT_RETRY_OPTIONS);
+
+ // Act & Assert
+ StepVerifier.create(producer.enqueueEvent(event1))
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ StepVerifier.create(producer.enqueueEvent(event2))
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful batch pushed downstream.");
+
+ assertEquals(1, holder.succeededContexts.size());
+
+ final SendBatchSucceededContext first = holder.succeededContexts.get(0);
+ assertEquals(PARTITION_ID, first.getPartitionId());
+ assertEquals(batchEvents, first.getEvents());
+
+ assertEquals(2, batchEvents.size());
+
+ assertTrue(holder.failedContexts.isEmpty());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void publishesErrors() throws InterruptedException {
+ // Arrange
+ successSemaphore.acquire();
+ failedSemaphore.acquire();
+
+ final InvocationHolder holder = new InvocationHolder();
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(Duration.ofSeconds(5));
+ options.setSendSucceededContext(holder::onSucceed);
+ options.setSendFailedContext(holder::onFailed);
+
+ final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime());
+
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event3, event4);
+
+ final Throwable error = new IllegalStateException("test-options.");
+ when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty(), Mono.error(error));
+
+ final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID,
+ options, DEFAULT_RETRY_OPTIONS);
+
+ // Act & Assert
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2)))
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event3)))
+ .thenAwait(options.getMaxWaitTime())
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful signal downstream.");
+
+ assertTrue(failedSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful error downstream.");
+
+ assertEquals(1, holder.succeededContexts.size());
+
+ final SendBatchSucceededContext first = holder.succeededContexts.get(0);
+ assertEquals(PARTITION_ID, first.getPartitionId());
+ assertEquals(batchEvents, first.getEvents());
+
+ assertEquals(2, batchEvents.size());
+
+ assertEquals(1, holder.failedContexts.size());
+ }
+
+ /**
+ * Checks that after an error publishing one batch, it can still publish subsequent batches successfully.
+ */
+ @Test
+ public void canPublishAfterErrors() throws InterruptedException {
+ // Arrange
+ final CountDownLatch success = new CountDownLatch(2);
+ failedSemaphore.acquire();
+
+ final InvocationHolder holder = new InvocationHolder();
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(Duration.ofSeconds(5));
+ options.setSendSucceededContext(context -> {
+ holder.onSucceed(context);
+ success.countDown();
+ });
+ options.setSendFailedContext(context -> holder.onFailed(context));
+
+ final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime());
+
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1, event2);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event3);
+
+ final List batchEvents3 = new ArrayList<>();
+ final EventData event5 = new EventData("five");
+ setupBatchMock(batch3, batchEvents3, event4, event5);
+
+ final Throwable error = new IllegalStateException("test-options.");
+
+ final Queue> responses = new LinkedList<>();
+ responses.add(Mono.empty());
+ responses.add(Mono.error(error));
+ responses.add(Mono.empty());
+ when(client.send(any(EventDataBatch.class))).thenAnswer(invocation -> {
+ return responses.poll();
+ });
+
+ final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID,
+ options, DEFAULT_RETRY_OPTIONS);
+
+ // Act & Assert
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2)))
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ StepVerifier.create(producer.enqueueEvent(event3))
+ .thenAwait(options.getMaxWaitTime())
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event4), producer.enqueueEvent(event5)))
+ .thenAwait(options.getMaxWaitTime())
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ assertTrue(success.await(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful signal downstream.");
+
+ assertTrue(failedSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful error downstream.");
+
+ assertEquals(2, holder.succeededContexts.size());
+
+ // Verify the completed ones.
+ final SendBatchSucceededContext first = holder.succeededContexts.get(0);
+ assertEquals(PARTITION_ID, first.getPartitionId());
+ assertEquals(batchEvents, first.getEvents());
+
+ final SendBatchSucceededContext second = holder.succeededContexts.get(1);
+ assertEquals(PARTITION_ID, second.getPartitionId());
+ assertEquals(batchEvents3, second.getEvents());
+
+ // Verify the failed ones.
+ assertEquals(1, holder.failedContexts.size());
+ }
+
+ @Test
+ public void getBufferedEventCounts() throws InterruptedException {
+ // Arrange
+ final CountDownLatch success = new CountDownLatch(1);
+ failedSemaphore.acquire();
+
+ final InvocationHolder holder = new InvocationHolder();
+ final BufferedProducerClientOptions options = new BufferedProducerClientOptions();
+ options.setMaxWaitTime(Duration.ofSeconds(5));
+ options.setSendSucceededContext(context -> {
+ System.out.println("Batch received.");
+ holder.onSucceed(context);
+ success.countDown();
+ });
+ options.setSendFailedContext(context -> holder.onFailed(context));
+
+ final List batchEvents = new ArrayList<>();
+ setupBatchMock(batch, batchEvents, event1);
+
+ final List batchEvents2 = new ArrayList<>();
+ setupBatchMock(batch2, batchEvents2, event2, event3);
+
+ final List batchEvents3 = new ArrayList<>();
+ final EventData event5 = new EventData("five");
+ setupBatchMock(batch3, batchEvents3, event4, event5);
+
+ final List batchEvents4 = new ArrayList<>();
+ setupBatchMock(batch4, batchEvents4, event2, event3, event4, event5);
+
+ final List batchEvents5 = new ArrayList<>();
+ setupBatchMock(batch5, batchEvents5, event2, event3, event4, event5);
+
+ // Delaying send operation.
+ when(client.send(any(EventDataBatch.class)))
+ .thenAnswer(invocation -> Mono.delay(options.getMaxWaitTime()).then());
+
+ final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID,
+ options, DEFAULT_RETRY_OPTIONS);
+
+ // Act & Assert
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2),
+ producer.enqueueEvent(event3)), 1L)
+ .then(() -> {
+ // event1 was enqueued, event2 is in a batch, and event3 is currently in the queue waiting to be
+ // pushed downstream.
+ // batch1 (with event1) is being sent at the moment with the delay of options.getMaxWaitTime(), so the
+ // buffer doesn't drain so quickly.
+ final int bufferedEventCount = producer.getBufferedEventCount();
+ assertEquals(1, bufferedEventCount);
+ })
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ StepVerifier.create(Mono.when(producer.enqueueEvent(event4), producer.enqueueEvent(event5)))
+ .thenAwait(options.getMaxWaitTime())
+ .expectComplete()
+ .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout());
+
+ assertTrue(success.await(DEFAULT_RETRY_OPTIONS.getTryTimeout().toMillis(), TimeUnit.MILLISECONDS),
+ "Should have been able to get a successful signal downstream.");
+
+ assertTrue(1 <= holder.succeededContexts.size(), "Expected at least 1 succeeded contexts. Actual: " + holder.succeededContexts.size());
+
+ // Verify the completed ones.
+ final SendBatchSucceededContext first = holder.succeededContexts.get(0);
+ assertEquals(PARTITION_ID, first.getPartitionId());
+ assertEquals(batchEvents, first.getEvents());
+
+ if (holder.succeededContexts.size() > 1) {
+ final SendBatchSucceededContext second = holder.succeededContexts.get(1);
+ assertEquals(PARTITION_ID, second.getPartitionId());
+ assertEquals(batchEvents2, second.getEvents());
+ }
+ }
+
+ private class InvocationHolder {
+ private final List succeededContexts = new ArrayList<>();
+ private final List failedContexts = new ArrayList<>();
+
+ void onSucceed(SendBatchSucceededContext result) {
+ succeededContexts.add(result);
+ successSemaphore.release();
+ }
+
+ void onFailed(SendBatchFailedContext result) {
+ failedContexts.add(result);
+ failedSemaphore.release();
+ }
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClientIntegrationTest.java
new file mode 100644
index 000000000000..f7734ea1533d
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClientIntegrationTest.java
@@ -0,0 +1,272 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package com.azure.messaging.eventhubs;
+
+import com.azure.core.util.logging.ClientLogger;
+import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
+import com.azure.messaging.eventhubs.models.SendOptions;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Isolated;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests for {@link EventHubBufferedProducerAsyncClient}.
+ */
+@Isolated
+@Tag(TestUtils.INTEGRATION)
+public class EventHubBufferedProducerAsyncClientIntegrationTest extends IntegrationTestBase {
+ private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss")
+ .withLocale(Locale.US)
+ .withZone(ZoneId.of("America/Los_Angeles"));
+ private EventHubBufferedProducerAsyncClient producer;
+ private EventHubClient hubClient;
+ private String[] partitionIds;
+ private final Map partitionPropertiesMap = new HashMap<>();
+
+ public EventHubBufferedProducerAsyncClientIntegrationTest() {
+ super(new ClientLogger(EventHubBufferedProducerAsyncClientIntegrationTest.class));
+ }
+
+ @Override
+ protected void beforeTest() {
+ this.hubClient = new EventHubClientBuilder().connectionString(getConnectionString())
+ .buildClient();
+
+ List allIds = new ArrayList<>();
+ final EventHubProperties properties = hubClient.getProperties();
+
+ properties.getPartitionIds().forEach(id -> {
+ allIds.add(id);
+
+ final PartitionProperties partitionProperties = hubClient.getPartitionProperties(id);
+ partitionPropertiesMap.put(id, partitionProperties);
+ });
+
+ this.partitionIds = allIds.toArray(new String[0]);
+
+ assertFalse(partitionPropertiesMap.isEmpty(), "'partitionPropertiesMap' should have values.");
+ }
+
+ @Override
+ protected void afterTest() {
+ if (hubClient != null) {
+ hubClient.close();
+ }
+
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
+ /**
+ * Checks that we can publish round-robin.
+ *
+ * @throws InterruptedException If the semaphore cannot be awaited.
+ */
+ @Test
+ public void publishRoundRobin() throws InterruptedException {
+ // Arrange
+ final CountDownLatch countDownLatch = new CountDownLatch(partitionPropertiesMap.size());
+ final AtomicBoolean anyFailures = new AtomicBoolean(false);
+
+ final Duration maxWaitTime = Duration.ofSeconds(5);
+ final int queueSize = 10;
+
+ producer = new EventHubBufferedProducerClientBuilder()
+ .connectionString(getConnectionString())
+ .retryOptions(RETRY_OPTIONS)
+ .onSendBatchFailed(failed -> {
+ anyFailures.set(true);
+ fail("Exception occurred while sending messages." + failed.getThrowable());
+ })
+ .onSendBatchSucceeded(succeeded -> {
+ countDownLatch.countDown();
+ })
+ .maxEventBufferLengthPerPartition(queueSize)
+ .maxWaitTime(maxWaitTime)
+ .buildAsyncClient();
+
+ // Creating 2x number of events, we expect that each partition will get at least one of these events.
+ final int numberOfEvents = partitionPropertiesMap.size() * 2;
+ final List eventsToPublish = IntStream.range(0, numberOfEvents)
+ .mapToObj(index -> new EventData(String.valueOf(index)))
+ .collect(Collectors.toList());
+
+ // Waiting for at least maxWaitTime because events will get published by then.
+ StepVerifier.create(producer.enqueueEvents(eventsToPublish))
+ .assertNext(integer -> {
+ assertEquals(0, integer, "Do not expect anymore events in queue.");
+ })
+ .thenAwait(maxWaitTime)
+ .expectComplete()
+ .verify(TIMEOUT);
+
+ assertTrue(countDownLatch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS), "Did not get enough messages.");
+
+ // Assert
+ final Map propertiesAfterMap = producer.getEventHubProperties()
+ .flatMapMany(properties -> {
+ return Flux.fromIterable(properties.getPartitionIds())
+ .flatMap(id -> producer.getPartitionProperties(id));
+ })
+ .collectMap(properties -> properties.getId(), Function.identity())
+ .block(TIMEOUT);
+
+ assertNotNull(propertiesAfterMap, "'partitionPropertiesMap' should not be null");
+
+ assertFalse(anyFailures.get(), "Should not have encountered any failures.");
+ assertTrue(countDownLatch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS),
+ "Should have sent x batches where x is the number of partitions.");
+
+ // Check that the offsets have increased because we have published some events.
+ assertPropertiesUpdated(partitionPropertiesMap, propertiesAfterMap);
+ }
+
+ /**
+ * Checks that sending an iterable with multiple partition keys is successful.
+ */
+ @Test
+ public void publishWithPartitionKeys() throws InterruptedException {
+ // Arrange
+ final int numberOfEvents = partitionPropertiesMap.size() * 4;
+
+ final AtomicBoolean anyFailures = new AtomicBoolean(false);
+ final List succeededContexts = new ArrayList<>();
+ final CountDownLatch eventCountdown = new CountDownLatch(numberOfEvents);
+
+ final Duration maxWaitTime = Duration.ofSeconds(15);
+ final int queueSize = 10;
+
+ producer = new EventHubBufferedProducerClientBuilder()
+ .connectionString(getConnectionString())
+ .retryOptions(RETRY_OPTIONS)
+ .onSendBatchFailed(failed -> {
+ anyFailures.set(true);
+ fail("Exception occurred while sending messages." + failed.getThrowable());
+ })
+ .onSendBatchSucceeded(succeeded -> {
+ succeededContexts.add(succeeded);
+ succeeded.getEvents().forEach(e -> eventCountdown.countDown());
+ })
+ .maxEventBufferLengthPerPartition(queueSize)
+ .maxWaitTime(maxWaitTime)
+ .buildAsyncClient();
+
+ final Random randomInterval = new Random(10);
+ final Map> expectedPartitionIdsMap = new HashMap<>();
+ final PartitionResolver resolver = new PartitionResolver();
+
+ final List> publishEventMono = IntStream.range(0, numberOfEvents)
+ .mapToObj(index -> {
+ final String partitionKey = "partition-" + index;
+ final EventData eventData = new EventData(partitionKey);
+ final SendOptions sendOptions = new SendOptions().setPartitionKey(partitionKey);
+ final int delay = randomInterval.nextInt(20);
+
+ final String expectedPartitionId = resolver.assignForPartitionKey(partitionKey, partitionIds);
+
+ expectedPartitionIdsMap.compute(expectedPartitionId, (key, existing) -> {
+ if (existing == null) {
+ List events = new ArrayList<>();
+ events.add(partitionKey);
+ return events;
+ } else {
+ existing.add(partitionKey);
+ return existing;
+ }
+ });
+
+ return Mono.delay(Duration.ofSeconds(delay)).then(producer.enqueueEvent(eventData, sendOptions)
+ .doFinally(signal -> {
+ System.out.printf("\t[%s] %s Published event.%n", expectedPartitionId, formatter.format(Instant.now()));
+ }));
+ }).collect(Collectors.toList());
+
+ // Waiting for at least maxWaitTime because events will get published by then.
+ StepVerifier.create(Mono.when(publishEventMono))
+ .expectComplete()
+ .verify(TIMEOUT);
+
+ final boolean await = eventCountdown.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
+
+ assertFalse(anyFailures.get(), "Should not have encountered any failures.");
+ assertFalse(succeededContexts.isEmpty(), "Should have successfully sent some messages.");
+
+ for (SendBatchSucceededContext context : succeededContexts) {
+ final List expected = expectedPartitionIdsMap.get(context.getPartitionId());
+ assertNotNull(expected, "Did not find any expected for partitionId: " + context.getPartitionId());
+
+ context.getEvents().forEach(eventData -> {
+ final boolean success = expected.removeIf(key -> key.equals(eventData.getBodyAsString()));
+ assertTrue(success, "Unable to find key " + eventData.getBodyAsString()
+ + " in partition id: " + context.getEvents());
+ });
+ }
+
+ expectedPartitionIdsMap.forEach((key, value) -> {
+ assertTrue(value.isEmpty(), key + ": There should be no more partition keys. "
+ + String.join(",", value));
+ });
+
+ final Map finalProperties = getPartitionProperties();
+ assertPropertiesUpdated(partitionPropertiesMap, finalProperties);
+ }
+
+ private Map getPartitionProperties() {
+ final EventHubProperties properties1 = this.hubClient.getProperties();
+ final Map result = new HashMap<>();
+
+ properties1.getPartitionIds().forEach(id -> {
+ final PartitionProperties props = hubClient.getPartitionProperties(id);
+ result.put(id, props);
+ });
+
+ return result;
+ }
+
+ private static void assertPropertiesUpdated(Map initial,
+ Map afterwards) {
+
+ // Check that the offsets have increased because we have published some events.
+ initial.forEach((key, before) -> {
+ final PartitionProperties after = afterwards.get(key);
+
+ assertNotNull(after, "did not get properties for key: " + key);
+ assertEquals(before.getEventHubName(), after.getEventHubName());
+ assertEquals(before.getId(), after.getId());
+
+ assertTrue(after.getLastEnqueuedTime().isAfter(before.getLastEnqueuedTime()),
+ "Last enqueued time should be newer");
+
+ assertTrue(before.getLastEnqueuedSequenceNumber() < after.getLastEnqueuedSequenceNumber(),
+ "Sequence number should be greater.");
+ });
+ }
+}
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIntegrationTest.java
index 4e4903b8d83c..e55da016714c 100644
--- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIntegrationTest.java
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIntegrationTest.java
@@ -42,6 +42,13 @@ protected void beforeTest() {
.buildAsyncProducerClient();
}
+ @Override
+ protected void afterTest() {
+ if (producer != null) {
+ producer.close();
+ }
+ }
+
/**
* Verifies that we can create and send a message to an Event Hub partition.
*/
diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 000000000000..1f0955d450f0
--- /dev/null
+++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline