diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 5b9b8557bc2f..1e8711856671 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -4,8 +4,9 @@ ### Features Added +- Added algorithm for mapping partition keys to partition ids. - Added identifier to client. ([#22981](https://github.com/Azure/azure-sdk-for-java/issues/22981)) -- Adds algorithm for mapping partition keys to partition ids. +- Added EventHubBufferedProducerAsyncClient and EventHubBufferedProducerClient ### Breaking Changes diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java index da0448cfc2f6..6bf97eb78eb4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -17,12 +17,13 @@ import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; -import java.time.Duration; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; + /** * Aggregates {@link EventData} into {@link EventDataBatch} and pushes them downstream when: * @@ -33,10 +34,12 @@ */ class EventDataAggregator extends FluxOperator { private static final ClientLogger LOGGER = new ClientLogger(EventDataAggregator.class); + private final AtomicReference downstreamSubscription = new AtomicReference<>(); private final Supplier batchSupplier; private final String namespace; private final BufferedProducerClientOptions options; + private final String partitionId; /** * Build a {@link FluxOperator} wrapper around the passed parent {@link Publisher} @@ -44,9 +47,10 @@ class EventDataAggregator extends FluxOperator { * @param source the {@link Publisher} to decorate */ EventDataAggregator(Flux source, Supplier batchSupplier, - String namespace, BufferedProducerClientOptions options) { + String namespace, BufferedProducerClientOptions options, String partitionId) { super(source); + this.partitionId = partitionId; this.batchSupplier = batchSupplier; this.namespace = namespace; this.options = options; @@ -61,7 +65,7 @@ class EventDataAggregator extends FluxOperator { @Override public void subscribe(CoreSubscriber actual) { final EventDataAggregatorMain subscription = new EventDataAggregatorMain(actual, namespace, options, - batchSupplier, LOGGER); + batchSupplier, partitionId, LOGGER); if (!downstreamSubscription.compareAndSet(null, subscription)) { throw LOGGER.logThrowableAsError(new IllegalArgumentException( @@ -82,13 +86,13 @@ static class EventDataAggregatorMain implements Subscription, CoreSubscriber REQUESTED = AtomicLongFieldUpdater.newUpdater(EventDataAggregatorMain.class, "requested"); - private static final Duration MAX_TIME = Duration.ofMillis(Long.MAX_VALUE); - private final Sinks.Many eventSink; private final Disposable disposable; private final AtomicBoolean isCompleted = new AtomicBoolean(false); private final CoreSubscriber downstream; + + private final String partitionId; private final ClientLogger logger; private final Supplier batchSupplier; private final String namespace; @@ -97,25 +101,25 @@ static class EventDataAggregatorMain implements Subscription, CoreSubscriber downstream, String namespace, - BufferedProducerClientOptions options, Supplier batchSupplier, ClientLogger logger) { + BufferedProducerClientOptions options, Supplier batchSupplier, String partitionId, + ClientLogger logger) { this.namespace = namespace; this.downstream = downstream; + this.partitionId = partitionId; this.logger = logger; this.batchSupplier = batchSupplier; this.currentBatch = batchSupplier.get(); this.eventSink = Sinks.many().unicast().onBackpressureError(); - this.disposable = eventSink.asFlux() - .switchMap(value -> { - if (value == 0) { - return Flux.interval(MAX_TIME, MAX_TIME); - } else { - return Flux.interval(options.getMaxWaitTime(), options.getMaxWaitTime()); - } - }) - .subscribe(next -> { - logger.verbose("Time elapsed. Publishing batch."); + this.disposable = Flux.switchOnNext(eventSink.asFlux().map(e -> Flux.interval(options.getMaxWaitTime()) + .takeUntil(index -> isCompleted.get()))) + .subscribe(index -> { + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Time elapsed. Attempt to publish downstream."); updateOrPublishBatch(null, true); }); } @@ -140,7 +144,14 @@ public void request(long n) { */ @Override public void cancel() { + if (!isCompleted.compareAndSet(false, true)) { + return; + } + // Do not keep requesting more events upstream + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Disposing of aggregator."); subscription.cancel(); updateOrPublishBatch(null, true); @@ -166,7 +177,7 @@ public void onNext(EventData eventData) { eventSink.emitNext(1L, Sinks.EmitFailureHandler.FAIL_FAST); // When an EventDataBatch is pushed downstream, we decrement REQUESTED. However, if REQUESTED is still > 0, - // that means we did not publish the EventDataBatch (ie. because it was not full). We request another + // that means we did not publish the EventDataBatch (i.e. because it was not full). We request another // EventData upstream to try and fill this EventDataBatch and push it downstream. final long left = REQUESTED.get(this); if (left > 0) { @@ -268,6 +279,22 @@ private void publishDownstream() { this.currentBatch = null; } } + } catch (EventHubBufferedPartitionProducer.UncheckedInterruptedException exception) { + logger.info("An exception occurred while trying to get a new batch.", exception); + + if (this.lastError != null) { + logger.info("Exception has been set already, terminating EventDataAggregator."); + + final Throwable error = Operators.onNextError(previous, exception, downstream.currentContext(), + subscription); + + if (error != null) { + onError(error); + } + } else { + this.lastError = exception; + } + } catch (Throwable e) { final Throwable error = Operators.onNextError(previous, e, downstream.currentContext(), subscription); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java index 2ed75c9cb933..f36c5d61e197 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java @@ -3,6 +3,7 @@ package com.azure.messaging.eventhubs; +import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.util.logging.ClientLogger; @@ -15,87 +16,135 @@ import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.core.publisher.MonoSink; import reactor.core.publisher.Sinks; import reactor.core.scheduler.Schedulers; import reactor.util.concurrent.Queues; -import reactor.util.retry.Retry; import java.io.Closeable; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.Queue; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Supplier; +import static com.azure.core.amqp.implementation.RetryUtil.withRetry; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.EMIT_RESULT_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY; +import static com.azure.messaging.eventhubs.implementation.ClientConstants.SIGNAL_TYPE_KEY; + /** * Keeps track of publishing events to a partition. */ class EventHubBufferedPartitionProducer implements Closeable { - private final ClientLogger logger; + private static final ClientLogger LOGGER = new ClientLogger(EventHubBufferedPartitionProducer.class); + + private final AmqpRetryOptions retryOptions; private final EventHubProducerAsyncClient client; private final String partitionId; private final AmqpErrorContext errorContext; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Disposable publishSubscription; private final Sinks.Many eventSink; - private final Retry retryWhenPolicy = Retry.from(signal -> { - if (isClosed.get()) { - return Mono.empty(); - } else { - return Mono.just(true); - } - }); private final CreateBatchOptions createBatchOptions; private final Queue eventQueue; + private final AtomicBoolean isFlushing = new AtomicBoolean(false); + private final Semaphore flushSemaphore = new Semaphore(1); + private final PublishResultSubscriber publishResultSubscriber; EventHubBufferedPartitionProducer(EventHubProducerAsyncClient client, String partitionId, - BufferedProducerClientOptions options) { + BufferedProducerClientOptions options, AmqpRetryOptions retryOptions) { this.client = client; this.partitionId = partitionId; this.errorContext = new AmqpErrorContext(client.getFullyQualifiedNamespace()); this.createBatchOptions = new CreateBatchOptions().setPartitionId(partitionId); - - this.logger = new ClientLogger(EventHubBufferedPartitionProducer.class + "-" + partitionId); + this.retryOptions = retryOptions; final Supplier> queueSupplier = Queues.get(options.getMaxEventBufferLengthPerPartition()); this.eventQueue = queueSupplier.get(); this.eventSink = Sinks.many().unicast().onBackpressureBuffer(eventQueue); - final Flux eventDataBatchFlux = Flux.defer(() -> { - return new EventDataAggregator(eventSink.asFlux(), - () -> { - final Mono batch = client.createBatch(createBatchOptions); - try { - return batch.toFuture().get(); - } catch (InterruptedException | ExecutionException e) { - throw new RuntimeException("Unexpected exception when getting batch.", e); - } - }, - client.getFullyQualifiedNamespace(), options); - }).retryWhen(retryWhenPolicy); + final Flux eventDataBatchFlux = new EventDataAggregator(eventSink.asFlux(), + this::createNewBatch, client.getFullyQualifiedNamespace(), options, partitionId); - final PublishResultSubscriber publishResultSubscriber = new PublishResultSubscriber(partitionId, - options.getSendSucceededContext(), options.getSendFailedContext(), logger); + this.publishResultSubscriber = new PublishResultSubscriber(partitionId, + options.getSendSucceededContext(), options.getSendFailedContext(), eventQueue, flushSemaphore, isFlushing, + retryOptions.getTryTimeout(), LOGGER); this.publishSubscription = publishEvents(eventDataBatchFlux) .publishOn(Schedulers.boundedElastic(), 1) .subscribeWith(publishResultSubscriber); } + /** + * Enqueues an event into the queue. + * + * @param eventData Event to enqueue. + * + * @return A mono that completes when it is in the queue. + * + * @throws IllegalStateException if the partition processor is already closed when trying to enqueue another + * event. + */ Mono enqueueEvent(EventData eventData) { - return Mono.create(sink -> { - sink.onRequest(request -> { - try { - eventSink.emitNext(eventData, (signalType, emitResult) -> { - // If the draining queue is slower than the publishing queue. - return emitResult == Sinks.EmitResult.FAIL_OVERFLOW; - }); - sink.success(); - } catch (Exception e) { - sink.error(new AmqpException(false, "Unable to buffer message for partition: " + getPartitionId(), errorContext)); + final Mono enqueueOperation = Mono.create(sink -> { + if (isClosed.get()) { + sink.error(new IllegalStateException(String.format( + "Partition publisher id[%s] is already closed. Cannot enqueue more events.", partitionId))); + return; + } + + try { + if (isFlushing.get() + && !flushSemaphore.tryAcquire(retryOptions.getTryTimeout().toMillis(), TimeUnit.MILLISECONDS)) { + + sink.error(new TimeoutException("Timed out waiting for flush operation to complete.")); + return; } - }); + } catch (InterruptedException e) { + // Unsure whether this is recoverable by trying again? Maybe, since this could be scheduled on + // another thread. + sink.error(new TimeoutException("Unable to acquire flush semaphore due to interrupted exception.")); + return; + } + + try { + if (isClosed.get()) { + sink.error(new IllegalStateException(String.format("Partition publisher id[%s] was " + + "closed between flushing events and now. Cannot enqueue events.", partitionId))); + return; + } + + eventSink.emitNext(eventData, (signalType, emitResult) -> { + // If the draining queue is slower than the publishing queue. + LOGGER.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .addKeyValue(SIGNAL_TYPE_KEY, signalType) + .addKeyValue(EMIT_RESULT_KEY, emitResult) + .log("Could not push event downstream."); + switch (emitResult) { + case FAIL_OVERFLOW: + case FAIL_NON_SERIALIZED: + return true; + default: + LOGGER.info("Not trying to emit again. EmitResult: {}", emitResult); + return false; + } + }); + sink.success(); + } catch (Exception e) { + sink.error(new AmqpException(false, "Unable to buffer message for partition: " + getPartitionId(), e, + errorContext)); + } }); + + return withRetry(enqueueOperation, retryOptions, "Timed out trying to enqueue event data.", true); } /** @@ -116,14 +165,30 @@ int getBufferedEventCount() { return eventQueue.size(); } + /** + * Flushes all the events in the queue. Does not allow for any additional events to be enqueued as it is being + * flushed. + * + * @return A Mono that completes when all events are flushed. + */ + Mono flush() { + return publishResultSubscriber.startFlush(); + } + @Override public void close() { if (isClosed.getAndSet(true)) { return; } - publishSubscription.dispose(); - client.close(); + try { + publishResultSubscriber.startFlush().block(retryOptions.getTryTimeout()); + } catch (IllegalStateException e) { + LOGGER.info("Timed out waiting for flush to complete.", e); + } finally { + publishSubscription.dispose(); + client.close(); + } } /** @@ -132,15 +197,29 @@ public void close() { * @return A stream of published results. */ private Flux publishEvents(Flux upstream) { + return upstream.flatMap(batch -> { + return client.send(batch).thenReturn(new PublishResult(batch, null)) + // Resuming on error because an error is a terminal signal, so we want to wrap that with a result, + // so it doesn't stop publishing. + .onErrorResume(error -> Mono.just(new PublishResult(batch, error))); + }, 1, 1); + } - return Flux.defer(() -> { - return upstream.flatMap(batch -> { - return client.send(batch).thenReturn(new PublishResult(batch, null)) - // Resuming on error because an error is a terminal signal, so we want to wrap that with a result, - // so it doesn't stop publishing. - .onErrorResume(error -> Mono.just(new PublishResult(batch, error))); - }, 1, 1); - }).retryWhen(retryWhenPolicy); + /** + * Creates a new batch. + * + * @return A new EventDataBatch + * + * @throws UncheckedInterruptedException If an exception occurred when trying to create a new batch. It is + * possible when the thread is interrupted while creating the batch. + */ + private EventDataBatch createNewBatch() { + final Mono batch = client.createBatch(createBatchOptions); + try { + return batch.toFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw LOGGER.logExceptionAsError(new UncheckedInterruptedException(e)); + } } /** @@ -157,23 +236,33 @@ private static class PublishResult { } private static class PublishResultSubscriber extends BaseSubscriber { - private static final long REQUEST = 1L; private final String partitionId; private final Consumer onSucceed; private final Consumer onFailed; + private final Queue dataQueue; + private final Duration operationTimeout; private final ClientLogger logger; + private final AtomicBoolean isFlushing; + private final Semaphore flushSemaphore; + private MonoSink flushSink; + PublishResultSubscriber(String partitionId, Consumer onSucceed, - Consumer onFailed, ClientLogger logger) { + Consumer onFailed, Queue dataQueue, Semaphore flushSemaphore, + AtomicBoolean flush, Duration operationTimeout, ClientLogger logger) { this.partitionId = partitionId; this.onSucceed = onSucceed; this.onFailed = onFailed; + this.dataQueue = dataQueue; + this.flushSemaphore = flushSemaphore; + this.isFlushing = flush; + this.operationTimeout = operationTimeout; this.logger = logger; } @Override protected void hookOnSubscribe(Subscription subscription) { - upstream().request(REQUEST); + requestUnbounded(); } @Override @@ -184,19 +273,97 @@ protected void hookOnNext(PublishResult result) { onFailed.accept(new SendBatchFailedContext(result.batch.getEvents(), partitionId, result.error)); } - // Request one more PublishResult, which is equivalent to asking for another batch. - upstream().request(REQUEST); + tryCompleteFlush(); } @Override protected void hookOnError(Throwable throwable) { - logger.error("Publishing subscription completed and ended in an error.", throwable); + logger.atError() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Publishing subscription completed and ended in an error.", throwable); + onFailed.accept(new SendBatchFailedContext(null, partitionId, throwable)); + + tryCompleteFlush(); } @Override protected void hookOnComplete() { - logger.info("Publishing subscription completed."); + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Publishing subscription completed. Clearing rest of queue."); + + final List events = new ArrayList<>(this.dataQueue); + this.dataQueue.clear(); + + onFailed.accept(new SendBatchFailedContext(events, partitionId, null)); + + tryCompleteFlush(); + } + + /** + * Flushes the queue. Releases semaphore when it is complete. + * + * @throws NullPointerException if {@code semaphore} or {@code sink} is null. + */ + Mono startFlush() { + return Mono.create(sink -> { + if (!isFlushing.compareAndSet(false, true)) { + logger.atInfo() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Flush operation already in progress."); + sink.success(); + return; + } + + this.flushSink = sink; + try { + if (!flushSemaphore.tryAcquire(operationTimeout.toMillis(), TimeUnit.MILLISECONDS)) { + sink.error(new TimeoutException("Unable to acquire flush semaphore to begin timeout operation.")); + } + + tryCompleteFlush(); + } catch (InterruptedException e) { + logger.atWarning() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Unable to acquire flush semaphore."); + + sink.error(e); + } + }); + } + + /** + * Checks whether data queue is empty, if it is, completes the flush. + */ + private void tryCompleteFlush() { + if (!isFlushing.get()) { + return; + } + + if (!dataQueue.isEmpty()) { + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Data queue is not empty. Not completing flush."); + return; + } + + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Completing flush operation."); + + if (flushSemaphore != null) { + flushSemaphore.release(); + } + + isFlushing.compareAndSet(true, false); + flushSink.success(); + } + } + + static class UncheckedInterruptedException extends RuntimeException { + UncheckedInterruptedException(Throwable error) { + super("Unable to fetch batch.", error); } } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java index a6c00ea851b5..c32d171c141b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java @@ -3,9 +3,11 @@ package com.azure.messaging.eventhubs; +import com.azure.core.amqp.AmqpRetryOptions; import com.azure.core.annotation.ReturnType; import com.azure.core.annotation.ServiceClient; import com.azure.core.annotation.ServiceMethod; +import com.azure.core.util.CoreUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.models.SendBatchFailedContext; import com.azure.messaging.eventhubs.models.SendBatchSucceededContext; @@ -15,8 +17,16 @@ import java.io.Closeable; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static com.azure.core.util.FluxUtil.monoError; /** * A client responsible for publishing instances of {@link EventData} to a specific Event Hub. Depending on the options @@ -51,36 +61,44 @@ */ @ServiceClient(builder = EventHubBufferedProducerClientBuilder.class, isAsync = true) public final class EventHubBufferedProducerAsyncClient implements Closeable { + private static final SendOptions ROUND_ROBIN_SEND_OPTIONS = new SendOptions(); + private final ClientLogger logger = new ClientLogger(EventHubBufferedProducerAsyncClient.class); + private final AtomicBoolean isClosed = new AtomicBoolean(false); private final EventHubProducerAsyncClient client; - private final EventHubClientBuilder builder; private final BufferedProducerClientOptions clientOptions; + private final PartitionResolver partitionResolver; private final Mono initialisationMono; + private final Mono partitionIdsMono; // Key: partitionId. private final ConcurrentHashMap partitionProducers = new ConcurrentHashMap<>(); + private final AmqpRetryOptions retryOptions; - EventHubBufferedProducerAsyncClient(EventHubClientBuilder builder, BufferedProducerClientOptions clientOptions) { - this.builder = builder; + EventHubBufferedProducerAsyncClient(EventHubClientBuilder builder, BufferedProducerClientOptions clientOptions, + PartitionResolver partitionResolver, AmqpRetryOptions retryOptions) { this.client = builder.buildAsyncProducerClient(); this.clientOptions = clientOptions; - - this.initialisationMono = Mono.using( - () -> builder.buildAsyncClient(), - eventHubClient -> { - return eventHubClient.getPartitionIds() - .handle((partitionId, sink) -> { - try { - partitionProducers.put(partitionId, new EventHubBufferedPartitionProducer(client, - partitionId, clientOptions)); - sink.complete(); - } catch (Exception e) { - sink.error(e); - } - }).then(); - }, - eventHubClient -> eventHubClient.close()).cache(); + this.partitionResolver = partitionResolver; + this.retryOptions = retryOptions; + + final Mono partitionProducerFluxes = this.client.getEventHubProperties() + .flatMapMany(property -> { + final String[] as = property.getPartitionIds().stream().toArray(String[]::new); + return Flux.fromArray(as); + }) + .map(partitionId -> { + return partitionProducers.computeIfAbsent(partitionId, key -> { + return new EventHubBufferedPartitionProducer(client, key, clientOptions, retryOptions); + }); + }).then(); + + this.initialisationMono = partitionProducerFluxes.cache(); + + this.partitionIdsMono = initialisationMono.then(Mono.fromCallable(() -> { + return new ArrayList<>(partitionProducers.keySet()).toArray(new String[0]); + })).cache(); } /** @@ -109,7 +127,7 @@ public String getEventHubName() { */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono getEventHubProperties() { - return client.getEventHubProperties(); + return initialisationMono.then(Mono.defer(() -> client.getEventHubProperties())); } /** @@ -119,7 +137,7 @@ public Mono getEventHubProperties() { */ @ServiceMethod(returns = ReturnType.COLLECTION) public Flux getPartitionIds() { - return client.getPartitionIds(); + return partitionIdsMono.flatMapMany(ids -> Flux.fromArray(ids)); } /** @@ -131,9 +149,16 @@ public Flux getPartitionIds() { * @return The set of information for the requested partition under the Event Hub this client is associated with. * * @throws NullPointerException if {@code partitionId} is null. + * @throws IllegalArgumentException if {@code partitionId} is empty. */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono getPartitionProperties(String partitionId) { + if (Objects.isNull(partitionId)) { + return monoError(logger, new NullPointerException("'partitionId' cannot be null.")); + } else if (CoreUtils.isNullOrEmpty(partitionId)) { + return monoError(logger, new IllegalArgumentException("'partitionId' cannot be empty.")); + } + return client.getPartitionProperties(partitionId); } @@ -156,6 +181,9 @@ public int getBufferedEventCount() { * @param partitionId The partition identifier. * * @return The number of events that are buffered and waiting to be published for a given partition. + * + * @throws NullPointerException if {@code partitionId} is null. + * @throws IllegalArgumentException if {@code partitionId} is empty. */ public int getBufferedEventCount(String partitionId) { final EventHubBufferedPartitionProducer producer = partitionProducers.get(partitionId); @@ -175,9 +203,12 @@ public int getBufferedEventCount(String partitionId) { * * @return The total number of events that are currently buffered and waiting to be published, across all * partitions. + * + * @throws NullPointerException if {@code eventData} is null. + * @throws IllegalStateException if the producer was closed while queueing an event. */ public Mono enqueueEvent(EventData eventData) { - return null; + return enqueueEvent(eventData, ROUND_ROBIN_SEND_OPTIONS); } /** @@ -189,13 +220,62 @@ public Mono enqueueEvent(EventData eventData) { * published yet. Publishing will take place at a nondeterministic point in the future as the buffer is processed. * * @param eventData The event to be enqueued into the buffer and, later, published. - * @param options The set of options to apply when publishing this event. + * @param options The set of options to apply when publishing this event. If partitionKey and partitionId are + * not set, then the event is distributed round-robin amongst all the partitions. * * @return The total number of events that are currently buffered and waiting to be published, across all * partitions. + * + * @throws NullPointerException if {@code eventData} or {@code options} is null. + * @throws IllegalArgumentException if {@link SendOptions#getPartitionId() getPartitionId} is set and is not + * valid. + * @throws IllegalStateException if the producer was closed while queueing an event. */ public Mono enqueueEvent(EventData eventData, SendOptions options) { - return null; + if (eventData == null) { + return monoError(logger, new NullPointerException("'eventData' cannot be null.")); + } else if (options == null) { + return monoError(logger, new NullPointerException("'options' cannot be null.")); + } + + if (!CoreUtils.isNullOrEmpty(options.getPartitionId())) { + if (!partitionProducers.containsKey(options.getPartitionId())) { + return monoError(logger, new IllegalArgumentException("partitionId is not valid. Available ones: " + + String.join(",", partitionProducers.keySet()))); + } + + final EventHubBufferedPartitionProducer producer = + partitionProducers.computeIfAbsent(options.getPartitionId(), key -> { + return new EventHubBufferedPartitionProducer(client, key, clientOptions, retryOptions); + }); + + return producer.enqueueEvent(eventData).thenReturn(getBufferedEventCount()); + } + + if (options.getPartitionKey() != null) { + return partitionIdsMono.flatMap(ids -> { + final String partitionId = partitionResolver.assignForPartitionKey(options.getPartitionKey(), ids); + final EventHubBufferedPartitionProducer producer = partitionProducers.get(partitionId); + if (producer == null) { + return monoError(logger, new IllegalArgumentException( + String.format("Unable to find EventHubBufferedPartitionProducer for partitionId: %s when " + + "mapping partitionKey: %s to available partitions.", partitionId, + options.getPartitionKey()))); + } + + return producer.enqueueEvent(eventData).thenReturn(getBufferedEventCount()); + }); + } else { + return partitionIdsMono.flatMap(ids -> { + final String partitionId = partitionResolver.assignRoundRobin(ids); + final EventHubBufferedPartitionProducer producer = + partitionProducers.computeIfAbsent(partitionId, key -> { + return new EventHubBufferedPartitionProducer(client, key, clientOptions, retryOptions); + }); + + return producer.enqueueEvent(eventData).thenReturn(getBufferedEventCount()); + }); + } } /** @@ -210,9 +290,12 @@ public Mono enqueueEvent(EventData eventData, SendOptions options) { * * @return The total number of events that are currently buffered and waiting to be published, across all * partitions. + * + * @throws NullPointerException if {@code events} is null. + * @throws IllegalStateException if the producer was closed while queueing an event. */ public Mono enqueueEvents(Iterable events) { - return null; + return enqueueEvents(events, ROUND_ROBIN_SEND_OPTIONS); } /** @@ -228,9 +311,25 @@ public Mono enqueueEvents(Iterable events) { * * @return The total number of events that are currently buffered and waiting to be published, across all * partitions. + * + * @throws NullPointerException if {@code eventData} or {@code options} is null. + * @throws IllegalArgumentException if {@link SendOptions#getPartitionId() getPartitionId} is set and is not + * valid. + * @throws IllegalStateException if the producer was closed while queueing an event. */ public Mono enqueueEvents(Iterable events, SendOptions options) { - return null; + if (events == null) { + return monoError(logger, new NullPointerException("'eventData' cannot be null.")); + } else if (options == null) { + return monoError(logger, new NullPointerException("'options' cannot be null.")); + } + + final List> enqueued = StreamSupport.stream(events.spliterator(), false) + .map(event -> enqueueEvent(event, options)) + .collect(Collectors.toList()); + + // concat subscribes to each publisher in sequence, so the last value will be the latest. + return Flux.concat(enqueued).last(); } /** @@ -244,7 +343,11 @@ public Mono enqueueEvents(Iterable events, SendOptions optio * @return A mono that completes when the buffers are empty. */ public Mono flush() { - return null; + final List> flushOperations = partitionProducers.values().stream() + .map(value -> value.flush()) + .collect(Collectors.toList()); + + return Flux.merge(flushOperations).then(); } /** @@ -252,6 +355,11 @@ public Mono flush() { */ @Override public void close() { + if (isClosed.getAndSet(true)) { + return; + } + + partitionProducers.values().forEach(partitionProducer -> partitionProducer.close()); client.close(); } @@ -264,12 +372,12 @@ static class BufferedProducerClientOptions { private int maxConcurrentSendsPerPartition = 1; private int maxEventBufferLengthPerPartition = 1500; - private Duration maxWaitTime; + private Duration maxWaitTime = Duration.ofSeconds(30); private Consumer sendFailedContext; private Consumer sendSucceededContext; - private int maxConcurrentSends; + private int maxConcurrentSends = 1; - boolean isEnableIdempotentRetries() { + boolean enableIdempotentRetries() { return enableIdempotentRetries; } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClient.java index 521078c0bf64..51b43c6957bc 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClient.java @@ -4,8 +4,13 @@ package com.azure.messaging.eventhubs; +import com.azure.core.annotation.ReturnType; import com.azure.core.annotation.ServiceClient; +import com.azure.core.annotation.ServiceMethod; +import com.azure.core.util.IterableStream; +import com.azure.messaging.eventhubs.models.SendOptions; +import java.io.Closeable; import java.time.Duration; import java.util.function.Consumer; @@ -15,9 +20,9 @@ * specified partition key, or assigned a specifically requested partition. * *

- * The {@link EventHubBufferedProducerClient} does not publish immediately, instead using a deferred model where - * events are collected into a buffer so that they may be efficiently batched and published when the batch is full or - * the {@link EventHubBufferedProducerClientBuilder#maxWaitTime(Duration) maxWaitTime} has elapsed with no new events + * The {@link EventHubBufferedProducerClient} does not publish immediately, instead using a deferred model where events + * are collected into a buffer so that they may be efficiently batched and published when the batch is full or the + * {@link EventHubBufferedProducerClientBuilder#maxWaitTime(Duration) maxWaitTime} has elapsed with no new events * enqueued. *

*

@@ -41,5 +46,197 @@ *

*/ @ServiceClient(builder = EventHubBufferedProducerClientBuilder.class, isAsync = false) -public final class EventHubBufferedProducerClient { +public final class EventHubBufferedProducerClient implements Closeable { + private final EventHubBufferedProducerAsyncClient client; + private final Duration operationTimeout; + + EventHubBufferedProducerClient(EventHubBufferedProducerAsyncClient asyncClient, Duration operationTimeout) { + this.client = asyncClient; + this.operationTimeout = operationTimeout; + } + + /** + * Gets the fully qualified Event Hubs namespace that the connection is associated with. This is likely similar to + * {@code {yournamespace}.servicebus.windows.net}. + * + * @return The fully qualified Event Hubs namespace that the connection is associated with + */ + public String getFullyQualifiedNamespace() { + return client.getFullyQualifiedNamespace(); + } + + /** + * Gets the Event Hub name this client interacts with. + * + * @return The Event Hub name this client interacts with. + */ + public String getEventHubName() { + return client.getEventHubName(); + } + + /** + * Retrieves information about an Event Hub, including the number of partitions present and their identifiers. + * + * @return The set of information for the Event Hub that this client is associated with. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public EventHubProperties getEventHubProperties() { + return client.getEventHubProperties().block(operationTimeout); + } + + /** + * Retrieves the identifiers for the partitions of an Event Hub. + * + * @return A stream of identifiers for the partitions of an Event Hub. + */ + @ServiceMethod(returns = ReturnType.COLLECTION) + public IterableStream getPartitionIds() { + return new IterableStream<>(client.getPartitionIds()); + } + + /** + * Retrieves information about a specific partition for an Event Hub, including elements that describe the available + * events in the partition event stream. + * + * @param partitionId The unique identifier of a partition associated with the Event Hub. + * + * @return The set of information for the requested partition under the Event Hub this client is associated with. + * + * @throws NullPointerException if {@code partitionId} is null. + * @throws IllegalArgumentException if {@code partitionId} is empty. + */ + @ServiceMethod(returns = ReturnType.SINGLE) + public PartitionProperties getPartitionProperties(String partitionId) { + return client.getPartitionProperties(partitionId).block(operationTimeout); + } + + /** + * Gets the total number of events that are currently buffered and waiting to be published, across all partitions. + * + * @return The total number of events that are currently buffered and waiting to be published, across all + * partitions. + */ + public int getBufferedEventCount() { + return client.getBufferedEventCount(); + } + + /** + * Gets the number of events that are buffered and waiting to be published for a given partition. + * + * @param partitionId The partition identifier. + * + * @return The number of events that are buffered and waiting to be published for a given partition. + * + * @throws NullPointerException if {@code partitionId} is null. + * @throws IllegalArgumentException if {@code partitionId} is empty. + */ + public int getBufferedEventCount(String partitionId) { + return client.getBufferedEventCount(partitionId); + } + + /** + * Enqueues an {@link EventData} into the buffer to be published to the Event Hub. If there is no capacity in the + * buffer when this method is invoked, it will wait for space to become available and ensure that the {@code + * eventData} has been enqueued. + * + * When this call returns, the {@code eventData} has been accepted into the buffer, but it may not have been + * published yet. Publishing will take place at a nondeterministic point in the future as the buffer is processed. + * + * @param eventData The event to be enqueued into the buffer and, later, published. + * + * @return The total number of events that are currently buffered and waiting to be published, across all + * partitions. + * + * @throws NullPointerException if {@code eventData} is null. + * @throws IllegalStateException if the producer was closed while queueing an event. + */ + public Integer enqueueEvent(EventData eventData) { + return client.enqueueEvent(eventData).block(operationTimeout); + } + + /** + * Enqueues an {@link EventData} into the buffer to be published to the Event Hub. If there is no capacity in the + * buffer when this method is invoked, it will wait for space to become available and ensure that the {@code + * eventData} has been enqueued. + * + * When this call returns, the {@code eventData} has been accepted into the buffer, but it may not have been + * published yet. Publishing will take place at a nondeterministic point in the future as the buffer is processed. + * + * @param eventData The event to be enqueued into the buffer and, later, published. + * @param options The set of options to apply when publishing this event. If partitionKey and partitionId are + * not set, then the event is distributed round-robin amongst all the partitions. + * + * @return The total number of events that are currently buffered and waiting to be published, across all + * partitions. + * + * @throws NullPointerException if {@code eventData} or {@code options} is null. + * @throws IllegalArgumentException if {@link SendOptions#getPartitionId() getPartitionId} is set and is not + * valid. + * @throws IllegalStateException if the producer was closed while queueing an event. + */ + public Integer enqueueEvent(EventData eventData, SendOptions options) { + return client.enqueueEvent(eventData, options).block(operationTimeout); + } + + /** + * Enqueues a set of {@link EventData} into the buffer to be published to the Event Hub. If there is insufficient + * capacity in the buffer when this method is invoked, it will wait for space to become available and ensure that + * all EventData in the {@code events} set have been enqueued. + * + * When this call returns, the {@code events} have been accepted into the buffer, but it may not have been published + * yet. Publishing will take place at a nondeterministic point in the future as the buffer is processed. + * + * @param events The set of events to be enqueued into the buffer and, later, published. + * + * @return The total number of events that are currently buffered and waiting to be published, across all + * partitions. + * + * @throws NullPointerException if {@code events} is null. + * @throws IllegalStateException if the producer was closed while queueing an event. + */ + public Integer enqueueEvents(Iterable events) { + return client.enqueueEvents(events).block(operationTimeout); + } + + /** + * Enqueues a set of {@link EventData} into the buffer to be published to the Event Hub. If there is insufficient + * capacity in the buffer when this method is invoked, it will wait for space to become available and ensure that + * all EventData in the {@code events} set have been enqueued. + * + * When this call returns, the {@code events} have been accepted into the buffer, but it may not have been published + * yet. Publishing will take place at a nondeterministic point in the future as the buffer is processed. + * + * @param events The set of events to be enqueued into the buffer and, later, published. + * @param options The set of options to apply when publishing this event. + * + * @return The total number of events that are currently buffered and waiting to be published, across all + * partitions. + * + * @throws NullPointerException if {@code eventData} or {@code options} is null. + * @throws IllegalArgumentException if {@link SendOptions#getPartitionId() getPartitionId} is set and is not + * valid. + * @throws IllegalStateException if the producer was closed while queueing an event. + */ + public Integer enqueueEvents(Iterable events, SendOptions options) { + return client.enqueueEvents(events, options).block(operationTimeout); + } + + /** + * Attempts to publish all events in the buffer immediately. This may result in multiple batches being published, + * the outcome of each of which will be individually reported by the {@link EventHubBufferedProducerClientBuilder#onSendBatchFailed(Consumer)} + * and {@link EventHubBufferedProducerClientBuilder#onSendBatchSucceeded(Consumer)} handlers. + * + * Upon completion of this method, the buffer will be empty. + */ + public void flush() { + client.flush().block(operationTimeout.plus(operationTimeout)); + } + + /** + * Disposes of the producer and all its resources. + */ + @Override + public void close() { + client.close(); + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java index 4fcf0e163fdc..b1f431c4ccea 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java @@ -14,11 +14,13 @@ import com.azure.core.exception.AzureException; import com.azure.core.util.ClientOptions; import com.azure.core.util.Configuration; +import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.models.SendBatchFailedContext; import com.azure.messaging.eventhubs.models.SendBatchSucceededContext; import java.net.URL; import java.time.Duration; +import java.util.Objects; import java.util.function.Consumer; import static com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions; @@ -33,11 +35,22 @@ serviceClients = {EventHubBufferedProducerAsyncClient.class, EventHubBufferedProducerClient.class}, protocol = ServiceClientProtocol.AMQP) public final class EventHubBufferedProducerClientBuilder { + private static final ClientLogger LOGGER = new ClientLogger(EventHubBufferedProducerClientBuilder.class); + private final EventHubClientBuilder builder; private final BufferedProducerClientOptions clientOptions = new BufferedProducerClientOptions(); + private final PartitionResolver partitionResolver = new PartitionResolver(); + private AmqpRetryOptions retryOptions; /** - * Creates a new instance with the default transport {@link AmqpTransportType#AMQP}. + * Creates a new instance with the following defaults: + *
    + *
  • {@link #maxEventBufferLengthPerPartition(int)} is 1500
  • + *
  • {@link #transportType(AmqpTransportType)} is {@link AmqpTransportType#AMQP}
  • + *
  • {@link #maxConcurrentSendsPerPartition(int)} is 1
  • + *
  • {@link #maxConcurrentSends(int)} is 1
  • + *
  • {@link #maxWaitTime(Duration)} is 30 seconds
  • + *
*/ public EventHubBufferedProducerClientBuilder() { builder = new EventHubClientBuilder(); @@ -220,7 +233,7 @@ public EventHubBufferedProducerClientBuilder customEndpointAddress(String custom * * @return The updated {@link EventHubBufferedProducerClientBuilder} object. */ - public EventHubBufferedProducerClientBuilder enableIdempotentRetries(boolean enableIdempotentRetries) { + EventHubBufferedProducerClientBuilder enableIdempotentRetries(boolean enableIdempotentRetries) { clientOptions.setEnableIdempotentRetries(enableIdempotentRetries); return this; } @@ -229,8 +242,8 @@ public EventHubBufferedProducerClientBuilder enableIdempotentRetries(boolean ena * The total number of batches that may be sent concurrently, across all partitions. This limit takes precedence * over the value specified in {@link #maxConcurrentSendsPerPartition(int) maxConcurrentSendsPerPartition}, ensuring * this maximum is respected. When batches for the same partition are published concurrently, the ordering of - * events is not guaranteed. If the order events are published must be maintained, - * {@link #maxConcurrentSendsPerPartition(int) maxConcurrentSendsPerPartition} should not exceed 1. + * events is not guaranteed. If the order events are published must be maintained, {@link + * #maxConcurrentSendsPerPartition(int) maxConcurrentSendsPerPartition} should not exceed 1. * *

* By default, this will be set to the number of processors available in the host environment. @@ -240,20 +253,21 @@ public EventHubBufferedProducerClientBuilder enableIdempotentRetries(boolean ena * * @return The updated {@link EventHubBufferedProducerClientBuilder} object. */ - public EventHubBufferedProducerClientBuilder maxConcurrentSends(int maxConcurrentSends) { + EventHubBufferedProducerClientBuilder maxConcurrentSends(int maxConcurrentSends) { clientOptions.setMaxConcurrentSends(maxConcurrentSends); return this; } /** - * The number of batches that may be sent concurrently for a given partition. This option is superseded by - * the value specified for {@link #maxConcurrentSends(int) maxConcurrrentSends}, ensuring that limit is respected. + * The number of batches that may be sent concurrently for a given partition. This option is superseded by the + * value specified for {@link #maxConcurrentSends(int) maxConcurrrentSends}, ensuring that limit is respected. * - * @param maxConcurrentSendsPerPartition The number of batches that may be sent concurrently for a given partition. + * @param maxConcurrentSendsPerPartition The number of batches that may be sent concurrently for a given + * partition. * * @return The updated {@link EventHubBufferedProducerClientBuilder} object. */ - public EventHubBufferedProducerClientBuilder maxConcurrentSendsPerPartition(int maxConcurrentSendsPerPartition) { + EventHubBufferedProducerClientBuilder maxConcurrentSendsPerPartition(int maxConcurrentSendsPerPartition) { clientOptions.setMaxConcurrentSendsPerPartition(maxConcurrentSendsPerPartition); return this; } @@ -320,7 +334,6 @@ public EventHubBufferedProducerClientBuilder onSendBatchSucceeded( * * @param proxyOptions The proxy configuration to use. * - * * @return The updated {@link EventHubBufferedProducerClientBuilder} object. */ public EventHubBufferedProducerClientBuilder proxyOptions(ProxyOptions proxyOptions) { @@ -336,6 +349,7 @@ public EventHubBufferedProducerClientBuilder proxyOptions(ProxyOptions proxyOpti * @return The updated {@link EventHubBufferedProducerClientBuilder} object. */ public EventHubBufferedProducerClientBuilder retryOptions(AmqpRetryOptions retryOptions) { + this.retryOptions = retryOptions; builder.retryOptions(retryOptions); return this; } @@ -353,14 +367,50 @@ public EventHubBufferedProducerClientBuilder transportType(AmqpTransportType tra return this; } - /** * Builds a new instance of the async buffered producer client. * * @return A new instance of {@link EventHubBufferedProducerAsyncClient}. + * + * @throws NullPointerException if {@link #onSendBatchSucceeded(Consumer)}, {@link + * #onSendBatchFailed(Consumer)}, or {@link #maxWaitTime(Duration)} are null. + * @throws IllegalArgumentException if {@link #maxConcurrentSends(int)}, {@link + * #maxConcurrentSendsPerPartition(int)}, or {@link #maxEventBufferLengthPerPartition(int)} are less than 1. */ public EventHubBufferedProducerAsyncClient buildAsyncClient() { - return new EventHubBufferedProducerAsyncClient(builder, clientOptions); + + if (Objects.isNull(clientOptions.getSendSucceededContext())) { + throw LOGGER.logExceptionAsError(new NullPointerException("'onSendBatchSucceeded' cannot be null.")); + } + + if (Objects.isNull(clientOptions.getSendFailedContext())) { + throw LOGGER.logExceptionAsError(new NullPointerException("'onSendBatchFailed' cannot be null.")); + } + + if (Objects.isNull(clientOptions.getMaxWaitTime())) { + throw LOGGER.logExceptionAsError(new NullPointerException("'maxWaitTime' cannot be null.")); + } + + if (clientOptions.getMaxEventBufferLengthPerPartition() < 1) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException( + "'maxEventBufferLengthPerPartition' cannot be less than 1.")); + } + + if (clientOptions.getMaxConcurrentSends() < 1) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException( + "'maxConcurrentSends' cannot be less than 1.")); + } + + if (clientOptions.getMaxConcurrentSendsPerPartition() < 1) { + throw LOGGER.logExceptionAsError(new IllegalArgumentException( + "'maxConcurrentSendsPerPartition' cannot be less than 1.")); + } + + final AmqpRetryOptions options = retryOptions == null + ? EventHubClientBuilder.DEFAULT_RETRY + : retryOptions; + + return new EventHubBufferedProducerAsyncClient(builder, clientOptions, partitionResolver, options); } /** @@ -369,6 +419,10 @@ public EventHubBufferedProducerAsyncClient buildAsyncClient() { * @return A new instance of {@link EventHubBufferedProducerClient}. */ public EventHubBufferedProducerClient buildClient() { - return null; + final AmqpRetryOptions options = retryOptions == null + ? EventHubClientBuilder.DEFAULT_RETRY + : retryOptions; + + return new EventHubBufferedProducerClient(buildAsyncClient(), options.getTryTimeout()); } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index 55cd67314eb2..6fdb45e78ce2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -158,6 +158,9 @@ public class EventHubClientBuilder implements // So, limit the prefetch to just 1 by default. static final int DEFAULT_PREFETCH_COUNT_FOR_SYNC_CLIENT = 1; + static final AmqpRetryOptions DEFAULT_RETRY = new AmqpRetryOptions() + .setTryTimeout(ClientConstants.OPERATION_TIMEOUT); + /** * The name of the default consumer group in the Event Hubs service. */ @@ -177,8 +180,6 @@ public class EventHubClientBuilder implements private static final String UNKNOWN = "UNKNOWN"; private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING"; - private static final AmqpRetryOptions DEFAULT_RETRY = new AmqpRetryOptions() - .setTryTimeout(ClientConstants.OPERATION_TIMEOUT); private static final Pattern HOST_PORT_PATTERN = Pattern.compile("^[^:]+:\\d+"); private static final ClientLogger LOGGER = new ClientLogger(EventHubClientBuilder.class); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java index 80d8ba4bee7b..c18499b906bb 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ClientConstants.java @@ -20,6 +20,7 @@ public final class ClientConstants { public static final String ENTITY_PATH_KEY = "entityPath"; public static final String SIGNAL_TYPE_KEY = "signalType"; public static final String CLIENT_IDENTIFIER_KEY = "clientIdentifier"; + public static final String EMIT_RESULT_KEY = "emitResult"; // EventHubs specific logging context keys public static final String PARTITION_ID_KEY = "partitionId"; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java index 60f03efaa9e3..c50f9d4b94a6 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java @@ -30,6 +30,7 @@ public class EventDataAggregatorTest { private static final String NAMESPACE = "test.namespace"; + private static final String PARTITION_ID = "test-id"; private AutoCloseable mockCloseable; @@ -92,7 +93,7 @@ public void pushesFullBatchesDownstream() { }; final TestPublisher publisher = TestPublisher.createCold(); - final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID); // Act & Assert StepVerifier.create(aggregator) @@ -129,7 +130,7 @@ public void pushesBatchesAndError() { }; final TestPublisher publisher = TestPublisher.createCold(); - final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID); final IllegalArgumentException testException = new IllegalArgumentException("Test exception."); // Act & Assert @@ -158,10 +159,9 @@ public void pushesBatchAfterMaxTime() { setupBatchMock(batch, batchEvents, event1, event2); final List batchEvents2 = new ArrayList<>(); - setupBatchMock(batch2, batchEvents2, event3); + setupBatchMock(batch2, batchEvents2, event1, event2, event3); final Duration waitTime = Duration.ofSeconds(5); - final Duration halfWaitTime = waitTime.minusSeconds(2); final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); options.setMaxWaitTime(waitTime); @@ -181,15 +181,12 @@ public void pushesBatchAfterMaxTime() { }; final TestPublisher publisher = TestPublisher.createCold(); - final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID); // Act & Assert StepVerifier.create(aggregator) - .then(() -> publisher.next(event1)) - .thenAwait(halfWaitTime) - .then(() -> { - assertEquals(1, batchEvents.size()); - + .then(() -> { + publisher.next(event1); publisher.next(event2); }) .thenAwait(waitTime) @@ -197,12 +194,6 @@ public void pushesBatchAfterMaxTime() { assertEquals(b, batch); assertEquals(2, batchEvents.size()); }) - .expectNoEvent(waitTime) - .then(() -> publisher.next(event3)) - .thenAwait(waitTime) - .assertNext(e -> { - assertEquals(e, batch2, "Should be equal."); - }) .thenCancel() .verify(); } @@ -230,7 +221,7 @@ public void errorsOnEventThatDoesNotFit() { }; final TestPublisher publisher = TestPublisher.createCold(); - final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID); StepVerifier.create(aggregator) .then(() -> { @@ -274,7 +265,7 @@ public void respectsBackpressure() { }; final TestPublisher publisher = TestPublisher.createCold(); - final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID); final long request = 1L; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java index 72546d4e0f8f..b3c20b7d3f44 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java @@ -3,6 +3,7 @@ package com.azure.messaging.eventhubs; +import com.azure.core.amqp.AmqpRetryOptions; import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions; import com.azure.messaging.eventhubs.models.CreateBatchOptions; import com.azure.messaging.eventhubs.models.SendBatchFailedContext; @@ -10,6 +11,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -37,11 +39,13 @@ /** * Tests {@link EventHubBufferedPartitionProducer} */ +@Isolated public class EventHubBufferedPartitionProducerTest { private static final String PARTITION_ID = "10"; private static final String NAMESPACE = "test-eventhubs-namespace"; private static final String EVENT_HUB_NAME = "test-hub"; private static final List PARTITION_IDS = Arrays.asList("one", "two", PARTITION_ID, "four"); + private static final AmqpRetryOptions DEFAULT_RETRY_OPTIONS = new AmqpRetryOptions(); private AutoCloseable mockCloseable; @@ -122,14 +126,16 @@ public void publishesEvents() throws InterruptedException { when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty()); final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, - options); + options, DEFAULT_RETRY_OPTIONS); // Act & Assert StepVerifier.create(producer.enqueueEvent(event1)) - .verifyComplete(); + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); StepVerifier.create(producer.enqueueEvent(event2)) - .verifyComplete(); + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS), "Should have been able to get a successful batch pushed downstream."); @@ -170,15 +176,17 @@ public void publishesErrors() throws InterruptedException { when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty(), Mono.error(error)); final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, - options); + options, DEFAULT_RETRY_OPTIONS); // Act & Assert StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2))) - .verifyComplete(); + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); StepVerifier.create(Mono.when(producer.enqueueEvent(event3))) .thenAwait(options.getMaxWaitTime()) - .verifyComplete(); + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS), "Should have been able to get a successful signal downstream."); @@ -238,19 +246,22 @@ public void canPublishAfterErrors() throws InterruptedException { }); final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, - options); + options, DEFAULT_RETRY_OPTIONS); // Act & Assert StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2))) - .verifyComplete(); + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); StepVerifier.create(producer.enqueueEvent(event3)) .thenAwait(options.getMaxWaitTime()) - .verifyComplete(); + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); StepVerifier.create(Mono.when(producer.enqueueEvent(event4), producer.enqueueEvent(event5))) .thenAwait(options.getMaxWaitTime()) - .verifyComplete(); + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); assertTrue(success.await(waitTime.toMillis(), TimeUnit.MILLISECONDS), "Should have been able to get a successful signal downstream."); @@ -276,12 +287,12 @@ public void canPublishAfterErrors() throws InterruptedException { @Test public void getBufferedEventCounts() throws InterruptedException { // Arrange - final CountDownLatch success = new CountDownLatch(2); + final CountDownLatch success = new CountDownLatch(1); failedSemaphore.acquire(); final InvocationHolder holder = new InvocationHolder(); final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); - options.setMaxWaitTime(Duration.ofSeconds(3)); + options.setMaxWaitTime(Duration.ofSeconds(5)); options.setSendSucceededContext(context -> { System.out.println("Batch received."); holder.onSucceed(context); @@ -289,8 +300,6 @@ public void getBufferedEventCounts() throws InterruptedException { }); options.setSendFailedContext(context -> holder.onFailed(context)); - final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime()); - final List batchEvents = new ArrayList<>(); setupBatchMock(batch, batchEvents, event1); @@ -301,14 +310,22 @@ public void getBufferedEventCounts() throws InterruptedException { final EventData event5 = new EventData("five"); setupBatchMock(batch3, batchEvents3, event4, event5); + final List batchEvents4 = new ArrayList<>(); + setupBatchMock(batch4, batchEvents4, event2, event3, event4, event5); + + final List batchEvents5 = new ArrayList<>(); + setupBatchMock(batch5, batchEvents5, event2, event3, event4, event5); + // Delaying send operation. - when(client.send(any(EventDataBatch.class))).thenAnswer(invocation -> Mono.delay(options.getMaxWaitTime()).then()); + when(client.send(any(EventDataBatch.class))) + .thenAnswer(invocation -> Mono.delay(options.getMaxWaitTime()).then()); final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, - options); + options, DEFAULT_RETRY_OPTIONS); // Act & Assert - StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2), producer.enqueueEvent(event3)), 1L) + StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2), + producer.enqueueEvent(event3)), 1L) .then(() -> { // event1 was enqueued, event2 is in a batch, and event3 is currently in the queue waiting to be // pushed downstream. @@ -317,25 +334,29 @@ public void getBufferedEventCounts() throws InterruptedException { final int bufferedEventCount = producer.getBufferedEventCount(); assertEquals(1, bufferedEventCount); }) - .verifyComplete(); + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); StepVerifier.create(Mono.when(producer.enqueueEvent(event4), producer.enqueueEvent(event5))) - .verifyComplete(); + .thenAwait(options.getMaxWaitTime()) + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); - final long totalTime = waitTime.toMillis() + waitTime.toMillis(); - assertTrue(success.await(totalTime, TimeUnit.MILLISECONDS), + assertTrue(success.await(DEFAULT_RETRY_OPTIONS.getTryTimeout().toMillis(), TimeUnit.MILLISECONDS), "Should have been able to get a successful signal downstream."); - assertEquals(2, holder.succeededContexts.size()); + assertTrue(1 <= holder.succeededContexts.size(), "Expected at least 1 succeeded contexts. Actual: " + holder.succeededContexts.size()); // Verify the completed ones. final SendBatchSucceededContext first = holder.succeededContexts.get(0); assertEquals(PARTITION_ID, first.getPartitionId()); assertEquals(batchEvents, first.getEvents()); - final SendBatchSucceededContext second = holder.succeededContexts.get(1); - assertEquals(PARTITION_ID, second.getPartitionId()); - assertEquals(batchEvents2, second.getEvents()); + if (holder.succeededContexts.size() > 1) { + final SendBatchSucceededContext second = holder.succeededContexts.get(1); + assertEquals(PARTITION_ID, second.getPartitionId()); + assertEquals(batchEvents2, second.getEvents()); + } } private class InvocationHolder { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClientIntegrationTest.java new file mode 100644 index 000000000000..f7734ea1533d --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClientIntegrationTest.java @@ -0,0 +1,272 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.eventhubs.models.SendBatchSucceededContext; +import com.azure.messaging.eventhubs.models.SendOptions; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Isolated; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Tests for {@link EventHubBufferedProducerAsyncClient}. + */ +@Isolated +@Tag(TestUtils.INTEGRATION) +public class EventHubBufferedProducerAsyncClientIntegrationTest extends IntegrationTestBase { + private final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss") + .withLocale(Locale.US) + .withZone(ZoneId.of("America/Los_Angeles")); + private EventHubBufferedProducerAsyncClient producer; + private EventHubClient hubClient; + private String[] partitionIds; + private final Map partitionPropertiesMap = new HashMap<>(); + + public EventHubBufferedProducerAsyncClientIntegrationTest() { + super(new ClientLogger(EventHubBufferedProducerAsyncClientIntegrationTest.class)); + } + + @Override + protected void beforeTest() { + this.hubClient = new EventHubClientBuilder().connectionString(getConnectionString()) + .buildClient(); + + List allIds = new ArrayList<>(); + final EventHubProperties properties = hubClient.getProperties(); + + properties.getPartitionIds().forEach(id -> { + allIds.add(id); + + final PartitionProperties partitionProperties = hubClient.getPartitionProperties(id); + partitionPropertiesMap.put(id, partitionProperties); + }); + + this.partitionIds = allIds.toArray(new String[0]); + + assertFalse(partitionPropertiesMap.isEmpty(), "'partitionPropertiesMap' should have values."); + } + + @Override + protected void afterTest() { + if (hubClient != null) { + hubClient.close(); + } + + if (producer != null) { + producer.close(); + } + } + + /** + * Checks that we can publish round-robin. + * + * @throws InterruptedException If the semaphore cannot be awaited. + */ + @Test + public void publishRoundRobin() throws InterruptedException { + // Arrange + final CountDownLatch countDownLatch = new CountDownLatch(partitionPropertiesMap.size()); + final AtomicBoolean anyFailures = new AtomicBoolean(false); + + final Duration maxWaitTime = Duration.ofSeconds(5); + final int queueSize = 10; + + producer = new EventHubBufferedProducerClientBuilder() + .connectionString(getConnectionString()) + .retryOptions(RETRY_OPTIONS) + .onSendBatchFailed(failed -> { + anyFailures.set(true); + fail("Exception occurred while sending messages." + failed.getThrowable()); + }) + .onSendBatchSucceeded(succeeded -> { + countDownLatch.countDown(); + }) + .maxEventBufferLengthPerPartition(queueSize) + .maxWaitTime(maxWaitTime) + .buildAsyncClient(); + + // Creating 2x number of events, we expect that each partition will get at least one of these events. + final int numberOfEvents = partitionPropertiesMap.size() * 2; + final List eventsToPublish = IntStream.range(0, numberOfEvents) + .mapToObj(index -> new EventData(String.valueOf(index))) + .collect(Collectors.toList()); + + // Waiting for at least maxWaitTime because events will get published by then. + StepVerifier.create(producer.enqueueEvents(eventsToPublish)) + .assertNext(integer -> { + assertEquals(0, integer, "Do not expect anymore events in queue."); + }) + .thenAwait(maxWaitTime) + .expectComplete() + .verify(TIMEOUT); + + assertTrue(countDownLatch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS), "Did not get enough messages."); + + // Assert + final Map propertiesAfterMap = producer.getEventHubProperties() + .flatMapMany(properties -> { + return Flux.fromIterable(properties.getPartitionIds()) + .flatMap(id -> producer.getPartitionProperties(id)); + }) + .collectMap(properties -> properties.getId(), Function.identity()) + .block(TIMEOUT); + + assertNotNull(propertiesAfterMap, "'partitionPropertiesMap' should not be null"); + + assertFalse(anyFailures.get(), "Should not have encountered any failures."); + assertTrue(countDownLatch.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS), + "Should have sent x batches where x is the number of partitions."); + + // Check that the offsets have increased because we have published some events. + assertPropertiesUpdated(partitionPropertiesMap, propertiesAfterMap); + } + + /** + * Checks that sending an iterable with multiple partition keys is successful. + */ + @Test + public void publishWithPartitionKeys() throws InterruptedException { + // Arrange + final int numberOfEvents = partitionPropertiesMap.size() * 4; + + final AtomicBoolean anyFailures = new AtomicBoolean(false); + final List succeededContexts = new ArrayList<>(); + final CountDownLatch eventCountdown = new CountDownLatch(numberOfEvents); + + final Duration maxWaitTime = Duration.ofSeconds(15); + final int queueSize = 10; + + producer = new EventHubBufferedProducerClientBuilder() + .connectionString(getConnectionString()) + .retryOptions(RETRY_OPTIONS) + .onSendBatchFailed(failed -> { + anyFailures.set(true); + fail("Exception occurred while sending messages." + failed.getThrowable()); + }) + .onSendBatchSucceeded(succeeded -> { + succeededContexts.add(succeeded); + succeeded.getEvents().forEach(e -> eventCountdown.countDown()); + }) + .maxEventBufferLengthPerPartition(queueSize) + .maxWaitTime(maxWaitTime) + .buildAsyncClient(); + + final Random randomInterval = new Random(10); + final Map> expectedPartitionIdsMap = new HashMap<>(); + final PartitionResolver resolver = new PartitionResolver(); + + final List> publishEventMono = IntStream.range(0, numberOfEvents) + .mapToObj(index -> { + final String partitionKey = "partition-" + index; + final EventData eventData = new EventData(partitionKey); + final SendOptions sendOptions = new SendOptions().setPartitionKey(partitionKey); + final int delay = randomInterval.nextInt(20); + + final String expectedPartitionId = resolver.assignForPartitionKey(partitionKey, partitionIds); + + expectedPartitionIdsMap.compute(expectedPartitionId, (key, existing) -> { + if (existing == null) { + List events = new ArrayList<>(); + events.add(partitionKey); + return events; + } else { + existing.add(partitionKey); + return existing; + } + }); + + return Mono.delay(Duration.ofSeconds(delay)).then(producer.enqueueEvent(eventData, sendOptions) + .doFinally(signal -> { + System.out.printf("\t[%s] %s Published event.%n", expectedPartitionId, formatter.format(Instant.now())); + })); + }).collect(Collectors.toList()); + + // Waiting for at least maxWaitTime because events will get published by then. + StepVerifier.create(Mono.when(publishEventMono)) + .expectComplete() + .verify(TIMEOUT); + + final boolean await = eventCountdown.await(TIMEOUT.toMillis(), TimeUnit.MILLISECONDS); + + assertFalse(anyFailures.get(), "Should not have encountered any failures."); + assertFalse(succeededContexts.isEmpty(), "Should have successfully sent some messages."); + + for (SendBatchSucceededContext context : succeededContexts) { + final List expected = expectedPartitionIdsMap.get(context.getPartitionId()); + assertNotNull(expected, "Did not find any expected for partitionId: " + context.getPartitionId()); + + context.getEvents().forEach(eventData -> { + final boolean success = expected.removeIf(key -> key.equals(eventData.getBodyAsString())); + assertTrue(success, "Unable to find key " + eventData.getBodyAsString() + + " in partition id: " + context.getEvents()); + }); + } + + expectedPartitionIdsMap.forEach((key, value) -> { + assertTrue(value.isEmpty(), key + ": There should be no more partition keys. " + + String.join(",", value)); + }); + + final Map finalProperties = getPartitionProperties(); + assertPropertiesUpdated(partitionPropertiesMap, finalProperties); + } + + private Map getPartitionProperties() { + final EventHubProperties properties1 = this.hubClient.getProperties(); + final Map result = new HashMap<>(); + + properties1.getPartitionIds().forEach(id -> { + final PartitionProperties props = hubClient.getPartitionProperties(id); + result.put(id, props); + }); + + return result; + } + + private static void assertPropertiesUpdated(Map initial, + Map afterwards) { + + // Check that the offsets have increased because we have published some events. + initial.forEach((key, before) -> { + final PartitionProperties after = afterwards.get(key); + + assertNotNull(after, "did not get properties for key: " + key); + assertEquals(before.getEventHubName(), after.getEventHubName()); + assertEquals(before.getId(), after.getId()); + + assertTrue(after.getLastEnqueuedTime().isAfter(before.getLastEnqueuedTime()), + "Last enqueued time should be newer"); + + assertTrue(before.getLastEnqueuedSequenceNumber() < after.getLastEnqueuedSequenceNumber(), + "Sequence number should be greater."); + }); + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIntegrationTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIntegrationTest.java index 4e4903b8d83c..e55da016714c 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIntegrationTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientIntegrationTest.java @@ -42,6 +42,13 @@ protected void beforeTest() { .buildAsyncProducerClient(); } + @Override + protected void afterTest() { + if (producer != null) { + producer.close(); + } + } + /** * Verifies that we can create and send a message to an Event Hub partition. */