From 1a69f966369b6bd2380c4841d028f433ec60ad40 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 18 Jul 2022 13:06:31 -0700 Subject: [PATCH 01/29] Updating property name to getMaxEventBufferLengthPerPartition --- .../eventhubs/EventHubBufferedProducerAsyncClient.java | 10 +++++----- .../EventHubBufferedProducerClientBuilder.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java index 3e137a0e3c62..b91926b203fe 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java @@ -255,7 +255,7 @@ static class BufferedProducerClientOptions { private boolean enableIdempotentRetries = false; private int maxConcurrentSendsPerPartition = 1; - private int maxPendingEventCount = 1500; + private int maxEventBufferLengthPerPartition = 1500; private Duration maxWaitTime; private Consumer sendFailedContext; private Consumer sendSucceededContext; @@ -285,12 +285,12 @@ void setMaxConcurrentSendsPerPartition(int maxConcurrentSendsPerPartition) { this.maxConcurrentSendsPerPartition = maxConcurrentSendsPerPartition; } - int getMaxPendingEventCount() { - return maxPendingEventCount; + int getMaxEventBufferLengthPerPartition() { + return maxEventBufferLengthPerPartition; } - void setMaxPendingEventCount(int maxPendingEventCount) { - this.maxPendingEventCount = maxPendingEventCount; + void maxEventBufferLengthPerPartition(int maxPendingEventCount) { + this.maxEventBufferLengthPerPartition = maxPendingEventCount; } Duration getMaxWaitTime() { diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java index 0c7dfeba650a..4fcf0e163fdc 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerClientBuilder.java @@ -271,7 +271,7 @@ public EventHubBufferedProducerClientBuilder maxConcurrentSendsPerPartition(int * @return The updated {@link EventHubBufferedProducerClientBuilder} object. */ public EventHubBufferedProducerClientBuilder maxEventBufferLengthPerPartition(int maxEventBufferLengthPerPartition) { - clientOptions.setMaxPendingEventCount(maxEventBufferLengthPerPartition); + clientOptions.maxEventBufferLengthPerPartition(maxEventBufferLengthPerPartition); return this; } From 665de4c2e5ab16221a944777e8039150779cca1e Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 18 Jul 2022 13:21:49 -0700 Subject: [PATCH 02/29] Adds an aggregator that publishes events based on full batches and time. --- .../eventhubs/EventDataAggregator.java | 252 ++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java new file mode 100644 index 000000000000..6069c3188e2d --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -0,0 +1,252 @@ +package com.azure.messaging.eventhubs; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpErrorContext; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import reactor.core.CoreSubscriber; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxOperator; +import reactor.core.publisher.Operators; +import reactor.core.publisher.Sinks; + +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +class EventDataAggregator extends FluxOperator { + private static final ClientLogger LOGGER = new ClientLogger(EventDataAggregator.class); + private final AtomicReference downstreamSubscription = new AtomicReference<>(); + private final Supplier batchSupplier; + private final String namespace; + private final BufferedProducerClientOptions options; + + /** + * Build a {@link FluxOperator} wrapper around the passed parent {@link Publisher} + * + * @param source the {@link Publisher} to decorate + */ + EventDataAggregator(Flux source, Supplier batchSupplier, + String namespace, BufferedProducerClientOptions options) { + super(source); + + this.batchSupplier = batchSupplier; + this.namespace = namespace; + this.options = options; + } + + @Override + public void subscribe(CoreSubscriber actual) { + final EventDataAggregatorMain subscription = new EventDataAggregatorMain(actual, namespace, options, + batchSupplier, LOGGER); + + if (!downstreamSubscription.compareAndSet(null, subscription)) { + throw new IllegalArgumentException("Cannot resubscribe to multiple upstreams."); + } + + source.subscribeWith(subscription); + actual.onSubscribe(subscription); + } + + /** + * Main implementation class for subscribing to the upstream source and publishing events downstream. + */ + static class EventDataAggregatorMain implements Subscription, CoreSubscriber { + /** + * The number of requested EventDataBatches. + */ + private volatile long requested; + private static final AtomicLongFieldUpdater REQUESTED = + AtomicLongFieldUpdater.newUpdater(EventDataAggregatorMain.class, "requested"); + + private static final Duration MAX_TIME = Duration.ofMillis(Long.MAX_VALUE); + + private final Sinks.Many eventSink; + private final Disposable disposable; + + private final AtomicBoolean isCompleted = new AtomicBoolean(false); + private final CoreSubscriber downstream; + private final ClientLogger logger; + private final Supplier batchSupplier; + private final String namespace; + private final Object lock = new Object(); + + private Subscription subscription; + private EventDataBatch currentBatch; + + EventDataAggregatorMain(CoreSubscriber downstream, String namespace, + BufferedProducerClientOptions options, Supplier batchSupplier, ClientLogger logger) { + this.namespace = namespace; + this.downstream = downstream; + this.logger = logger; + this.batchSupplier = batchSupplier; + this.currentBatch = batchSupplier.get(); + + this.eventSink = Sinks.many().unicast().onBackpressureError(); + this.disposable = eventSink.asFlux() + .switchMap(value -> { + if (value == 0) { + return Flux.interval(MAX_TIME, MAX_TIME); + } else { + return Flux.interval(options.getMaxWaitTime(), options.getMaxWaitTime()); + } + }) + .subscribe(next -> { + logger.verbose("Time elapsed. Publishing batch."); + updateOrPublishBatch(null, true); + }); + } + + /** + * Request a number of {@link EventDataBatch}. + * + * @param n Number of batches requested. + */ + @Override + public void request(long n) { + if (!Operators.validate(n)) { + return; + } + + Operators.addCap(REQUESTED, this, n); + subscription.request(n); + } + + /** + * Cancels the subscription upstream. + */ + @Override + public void cancel() { + // Do not keep requesting more events upstream + subscription.cancel(); + + disposable.dispose(); + } + + @Override + public void onSubscribe(Subscription s) { + if (subscription != null) { + logger.warning("Subscription was already set. Cancelling existing subscription."); + subscription.cancel(); + } else { + subscription = s; + downstream.onSubscribe(this); + } + } + + @Override + public void onNext(EventData eventData) { + updateOrPublishBatch(eventData, false); + eventSink.emitNext(1L, Sinks.EmitFailureHandler.FAIL_FAST); + + final long left = REQUESTED.get(this); + if (left > 0) { + subscription.request(1L); + } + } + + @Override + public void onError(Throwable t) { + if (!isCompleted.compareAndSet(false, true)) { + Operators.onErrorDropped(t, downstream.currentContext()); + return; + } + + downstream.onError(t); + } + + /** + * Upstream signals a completion. + */ + @Override + public void onComplete() { + if (isCompleted.compareAndSet(false, true)) { + downstream.onComplete(); + } + } + + /** + * @param eventData EventData to add to or null if there are no events to add to the batch. + * @param alwaysPublish {@code true} to always push batch downstream. {@code false}, otherwise. + */ + private void updateOrPublishBatch(EventData eventData, boolean alwaysPublish) { + if (alwaysPublish) { + publishDownstream(); + return; + } else if (eventData == null) { + return; + } + + boolean added; + synchronized (lock) { + added = currentBatch.tryAdd(eventData); + } + + if (added) { + return; + } + + publishDownstream(); + + synchronized (lock) { + added = currentBatch.tryAdd(eventData); + } + + if (!added) { + final AmqpException error = new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, + "EventData exceeded maximum size.", new AmqpErrorContext(namespace)); + + onError(error); + } + } + + /** + * Publishes batch downstream if there are events in the batch and updates it. + */ + private void publishDownstream() { + EventDataBatch previous = null; + + try { + synchronized (lock) { + previous = this.currentBatch; + + if (previous == null) { + logger.warning("Batch should not be null, setting a new batch."); + + this.currentBatch = batchSupplier.get(); + return; + } else if (previous.getEvents().isEmpty()) { + return; + } + + downstream.onNext(previous); + + final long batchesLeft = REQUESTED.updateAndGet(this, (v) -> { + if (v == Long.MAX_VALUE) { + return v; + } else { + return v - 1; + } + }); + + logger.verbose("Batch published. Requested batches left: {}", batchesLeft); + this.currentBatch = batchSupplier.get(); + } + } catch (Throwable e) { + final Throwable error = Operators.onNextError(previous, e, downstream.currentContext(), subscription); + + logger.warning("Unable to push batch downstream to publish.", error); + + if (error != null) { + onError(error); + } + } + } + } +} From 638a7365410dd30dfedfa237829ea6f27fac60b1 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 18 Jul 2022 13:42:46 -0700 Subject: [PATCH 03/29] Adding implementation for publishing events for a partition. --- .../eventhubs/EventHubPartitionPublisher.java | 152 ++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionPublisher.java diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionPublisher.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionPublisher.java new file mode 100644 index 000000000000..e2623d914984 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionPublisher.java @@ -0,0 +1,152 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.amqp.exception.AmqpErrorContext; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.util.logging.ClientLogger; +import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions; +import com.azure.messaging.eventhubs.models.SendBatchFailedContext; +import com.azure.messaging.eventhubs.models.SendBatchSucceededContext; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Schedulers; +import reactor.util.concurrent.Queues; +import reactor.util.retry.Retry; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * Keeps track of publishing events to a partition. + */ +class EventHubPartitionPublisher implements Closeable { + private final ClientLogger logger; + private final EventHubProducerAsyncClient client; + private final String partitionId; + private final BufferedProducerClientOptions options; + private final AmqpErrorContext errorContext; + private final AtomicBoolean isClosed = new AtomicBoolean(false); + private final Mono linkSizeMono; + private final Disposable publishSubscription; + private final Sinks.Many eventSink; + + EventHubPartitionPublisher(EventHubProducerAsyncClient client, String partitionId, + BufferedProducerClientOptions options) { + this.client = client; + this.partitionId = partitionId; + this.options = options; + this.errorContext = new AmqpErrorContext(client.getFullyQualifiedNamespace()); + this.linkSizeMono = client.createBatch().map(batch -> batch.getMaxSizeInBytes()) + .publishOn(Schedulers.boundedElastic()) + .cache(value -> Duration.ofMillis(Long.MAX_VALUE), + error -> { + return Duration.ZERO; + }, () -> Duration.ZERO); + + this.logger = new ClientLogger(EventHubPartitionPublisher.class + "-" + partitionId); + + final Supplier> queueSupplier = Queues.get(options.getMaxEventBufferLengthPerPartition()); + this.eventSink = Sinks.many().unicast().onBackpressureBuffer(queueSupplier.get()); + + + final EventDataAggregator eventDataBatchFlux = new EventDataAggregator(eventSink.asFlux(), + () -> client.createBatch().subscribeOn(Schedulers.boundedElastic()).block(), + client.getFullyQualifiedNamespace(), options); + + this.publishSubscription = publishEvents(eventDataBatchFlux) + .publishOn(Schedulers.boundedElastic()) + .subscribe(result -> { + if (result.error == null) { + options.getSendSucceededContext().accept(new SendBatchSucceededContext(result.batch.getEvents(), + partitionId)); + } else { + options.getSendFailedContext().accept(new SendBatchFailedContext(result.batch.getEvents(), + partitionId, result.error)); + } + }, error -> { + logger.error("Publishing subscription completed and ended in an error.", error); + options.getSendFailedContext().accept(new SendBatchFailedContext(null, partitionId, error)); + }, + () -> { + logger.info("Publishing subscription completed."); + }); + + } + + Mono enqueueEvent(EventData eventData) { + return Mono.create(sink -> { + sink.onRequest(request -> { + try { + eventSink.emitNext(eventData, (signalType, emitResult) -> { + // If the draining queue is slower than the publishing queue. + return emitResult == Sinks.EmitResult.FAIL_OVERFLOW; + }); + sink.success(); + } catch (Exception e) { + sink.error(new AmqpException(false, "Unable to buffer message for partition: " + getPartitionId(), errorContext)); + } + }); + }); + } + + /** + * Gets the partition id that this producer is publishing events to. + * + * @return The partition id that this producer is publishing events to. + */ + String getPartitionId() { + return partitionId; + } + + /** + * Publishes {@link EventDataBatch} and returns the result. + * + * @return A stream of published results. + */ + Flux publishEvents(Flux upstream) { + return Flux.defer(() -> { + return upstream.flatMap(batch -> { + return client.send(batch).thenReturn(new PublishResult(batch, null)) + // Resuming on error because an error is a terminal signal, so we want to wrap that with a result, + // so it doesn't stop publishing. + .onErrorResume(error -> Mono.just(new PublishResult(null, error))); + }); + }).retryWhen(Retry.from(signal -> { + if (isClosed.get()) { + return Mono.empty(); + } else { + return Mono.just(true); + } + })); + } + + @Override + public void close() { + if (isClosed.getAndSet(true)) { + return; + } + + publishSubscription.dispose(); + client.close(); + } + + /** + * Static class to hold results. + */ + private static final class PublishResult { + private final EventDataBatch batch; + private final Throwable error; + + PublishResult(EventDataBatch batch, Throwable error) { + this.batch = batch; + this.error = error; + } + } +} From bde5331c6a134444dd97aa186efb7fdb18b39c75 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 18 Jul 2022 14:05:57 -0700 Subject: [PATCH 04/29] Adding license info. --- .../com/azure/messaging/eventhubs/EventDataAggregator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java index 6069c3188e2d..03981b991a77 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -1,3 +1,6 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + package com.azure.messaging.eventhubs; import com.azure.core.amqp.exception.AmqpErrorCondition; From 6f42173d1528b24250324ebaba6c7626bc1e1f94 Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 18 Jul 2022 15:29:04 -0700 Subject: [PATCH 05/29] Adding inline mock maker for final classes. --- .../resources/mockito-extensions/org.mockito.plugins.MockMaker | 1 + 1 file changed, 1 insertion(+) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 000000000000..1f0955d450f0 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline From 7c12be16135d88c6494140deea34bec37d83726c Mon Sep 17 00:00:00 2001 From: Connie Date: Mon, 18 Jul 2022 15:30:19 -0700 Subject: [PATCH 06/29] Fixing recursive subscription. --- .../eventhubs/EventDataAggregator.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java index 03981b991a77..47ae0af4ee9b 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -23,6 +23,14 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +/** + * Aggregates {@link EventData} into {@link EventDataBatch} and pushes them downstream when: + * + *
    + *
  • {@link BufferedProducerClientOptions#getMaxWaitTime()} elapses between events.
  • + *
  • {@link EventDataBatch} cannot hold any more events.
  • + *
+ */ class EventDataAggregator extends FluxOperator { private static final ClientLogger LOGGER = new ClientLogger(EventDataAggregator.class); private final AtomicReference downstreamSubscription = new AtomicReference<>(); @@ -44,6 +52,12 @@ class EventDataAggregator extends FluxOperator { this.options = options; } + /** + * Subscribes to events from this operator. Downstream subscribers invoke this method and subscribe to events from + * it. + * + * @param actual Downstream subscriber. + */ @Override public void subscribe(CoreSubscriber actual) { final EventDataAggregatorMain subscription = new EventDataAggregatorMain(actual, namespace, options, @@ -53,8 +67,7 @@ public void subscribe(CoreSubscriber actual) { throw new IllegalArgumentException("Cannot resubscribe to multiple upstreams."); } - source.subscribeWith(subscription); - actual.onSubscribe(subscription); + source.subscribe(subscription); } /** @@ -129,6 +142,8 @@ public void cancel() { // Do not keep requesting more events upstream subscription.cancel(); + updateOrPublishBatch(null, true); + downstream.onComplete(); disposable.dispose(); } @@ -161,6 +176,7 @@ public void onError(Throwable t) { return; } + updateOrPublishBatch(null, true); downstream.onError(t); } @@ -170,6 +186,7 @@ public void onError(Throwable t) { @Override public void onComplete() { if (isCompleted.compareAndSet(false, true)) { + updateOrPublishBatch(null, true); downstream.onComplete(); } } From 3d8ffffe718c897c06cb2f6e304a7a217f21844f Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 19 Jul 2022 09:26:31 -0700 Subject: [PATCH 07/29] Do not get additional batches if the current instance is completed. --- .../azure/messaging/eventhubs/EventDataAggregator.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java index 47ae0af4ee9b..94eb5ca57409 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -256,7 +256,13 @@ private void publishDownstream() { }); logger.verbose("Batch published. Requested batches left: {}", batchesLeft); - this.currentBatch = batchSupplier.get(); + + if (!isCompleted.get()) { + this.currentBatch = batchSupplier.get(); + } else { + logger.verbose("Aggregator is completed. Not setting another batch."); + this.currentBatch = null; + } } } catch (Throwable e) { final Throwable error = Operators.onNextError(previous, e, downstream.currentContext(), subscription); From b6b21496517780ebabca4030d509cf0f091fe756 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 19 Jul 2022 09:26:42 -0700 Subject: [PATCH 08/29] Adding tests. --- .../eventhubs/EventDataAggregatorTest.java | 310 ++++++++++++++++++ 1 file changed, 310 insertions(+) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java new file mode 100644 index 000000000000..1a5f083f1cc1 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java @@ -0,0 +1,310 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.core.amqp.exception.AmqpErrorCondition; +import com.azure.core.amqp.exception.AmqpException; +import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.test.StepVerifier; +import reactor.test.publisher.TestPublisher; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +public class EventDataAggregatorTest { + private static final String NAMESPACE = "test.namespace"; + + private AutoCloseable mockCloseable; + + @Mock + private EventDataBatch batch; + + @Mock + private EventDataBatch batch2; + + @Mock + private EventDataBatch batch3; + + private final EventData event1 = new EventData("foo"); + private final EventData event2 = new EventData("bar"); + private final EventData event3 = new EventData("baz"); + private final EventData event4 = new EventData("bart"); + + @BeforeEach + public void beforeEach() { + mockCloseable = MockitoAnnotations.openMocks(this); + } + + @AfterEach + public void afterEach() throws Exception { + if (mockCloseable != null) { + mockCloseable.close(); + } + + Mockito.framework().clearInlineMock(this); + } + + /** + * Tests that it pushes full batches downstream (when tryAdd returns fall). Also, publishes any batches downstream + * when upstream completes. + */ + @Test + public void pushesFullBatchesDownstream() { + // Arrange + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1, event2); + + final List batchEvents2 = new ArrayList<>(); + setupBatchMock(batch2, batchEvents2, event3); + + final Duration waitTime = Duration.ofSeconds(30); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(waitTime); + + final AtomicInteger first = new AtomicInteger(0); + final Supplier supplier = () -> { + final int current = first.getAndIncrement(); + + switch (current) { + case 0: + return batch; + case 1: + return batch2; + default: + throw new RuntimeException("pushesFullBatchesDownstream: Did not expect to get this many invocations."); + } + }; + + final TestPublisher publisher = TestPublisher.createCold(); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + + // Act & Assert + StepVerifier.create(aggregator) + .then(() -> { + publisher.next(event1, event2, event3); + }) + .expectNext(batch) + .then(() -> publisher.complete()) + .expectNext(batch2) + .expectComplete() + .verify(Duration.ofSeconds(10)); + } + + /** + * Tests that it pushes partial batches downstream when an error occurs. + */ + @Test + public void pushesBatchesAndError() { + // Arrange + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1, event2); + + final Duration waitTime = Duration.ofSeconds(30); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(waitTime); + + final AtomicBoolean first = new AtomicBoolean(); + final Supplier supplier = () -> { + if (first.compareAndSet(false, true)) { + return batch; + } else { + return batch2; + } + }; + + final TestPublisher publisher = TestPublisher.createCold(); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + final IllegalArgumentException testException = new IllegalArgumentException("Test exception."); + + // Act & Assert + StepVerifier.create(aggregator) + .then(() -> { + publisher.next(event1, event2); + publisher.error(testException); + }) + .expectNext(batch) + .expectErrorMatches(e -> e.equals(testException)) + .verify(Duration.ofSeconds(10)); + + // Verify that these events were added to the batch. + assertEquals(2, batchEvents.size()); + assertTrue(batchEvents.contains(event1)); + assertTrue(batchEvents.contains(event2)); + } + + /** + * Tests that batches are pushed downstream when max wait time has elapsed. + */ + @Test + public void pushesBatchAfterMaxTime() { + // Arrange + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1, event2); + + final List batchEvents2 = new ArrayList<>(); + setupBatchMock(batch2, batchEvents2, event3); + + final Duration waitTime = Duration.ofSeconds(5); + final Duration halfWaitTime = waitTime.minusSeconds(2); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(waitTime); + + final AtomicInteger first = new AtomicInteger(0); + final Supplier supplier = () -> { + final int current = first.getAndIncrement(); + + switch (current) { + case 0: + return batch; + case 1: + return batch2; + default: + System.out.println("Invoked get batch for the xth time:" + current); + return batch3; + } + }; + + final TestPublisher publisher = TestPublisher.createCold(); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + + // Act & Assert + StepVerifier.create(aggregator) + .then(() -> publisher.next(event1)) + .thenAwait(halfWaitTime) + .then(() -> { + assertEquals(1, batchEvents.size()); + + publisher.next(event2); + }) + .thenAwait(waitTime) + .assertNext(b -> { + assertEquals(b, batch); + assertEquals(2, batchEvents.size()); + }) + .expectNoEvent(waitTime) + .then(() -> publisher.next(event3)) + .thenAwait(waitTime) + .assertNext(e -> { + assertEquals(e, batch2, "Should be equal."); + }) + .thenCancel() + .verify(); + } + + /** + * Verifies that an error is propagated when it is too large for the link. + */ + @Test + public void errorsOnEventThatDoesNotFit() { + // Arrange + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents); + + final Duration waitTime = Duration.ofSeconds(30); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(waitTime); + + final AtomicBoolean first = new AtomicBoolean(); + final Supplier supplier = () -> { + if (first.compareAndSet(false, true)) { + return batch; + } else { + throw new IllegalArgumentException("Did not expect another batch call."); + } + }; + + final TestPublisher publisher = TestPublisher.createCold(); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + + StepVerifier.create(aggregator) + .then(() -> { + publisher.next(event1); + }) + .consumeErrorWith(error -> { + assertTrue(error instanceof AmqpException); + assertEquals(AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, ((AmqpException) error).getErrorCondition()); + }) + .verify(Duration.ofSeconds(20)); + } + + /** + * Verifies that backpressure requests are supported. + */ + @Test + public void respectsBackpressure() { + // Arrange + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1); + + final List batchEvents2 = new ArrayList<>(); + setupBatchMock(batch2, batchEvents2); + + final Duration waitTime = Duration.ofSeconds(3); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(waitTime); + + final AtomicInteger first = new AtomicInteger(0); + final Supplier supplier = () -> { + final int current = first.getAndIncrement(); + + switch (current) { + case 0: + return batch; + case 1: + return batch2; + default: + throw new RuntimeException("respectsBackpressure: Did not expect to get this many invocations."); + } + }; + + final TestPublisher publisher = TestPublisher.createCold(); + final EventDataAggregator aggregator = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options); + + final long request = 1L; + + StepVerifier.create(aggregator, request) + .then(() -> publisher.next(event1)) + .assertNext(b -> { + assertEquals(1, b.getCount()); + assertTrue(b.getEvents().contains(event1)); + }) + .expectNoEvent(waitTime) + .thenCancel() + .verify(); + + publisher.assertMaxRequested(request); + } + + private static void setupBatchMock(EventDataBatch batch, List resultSet, EventData... acceptedEvents) { + when(batch.tryAdd(any(EventData.class))).thenAnswer(invocation -> { + final EventData arg = invocation.getArgument(0); + + final boolean matches = Arrays.asList(acceptedEvents).contains(arg); + + if (matches) { + resultSet.add(arg); + } + + return matches; + }); + when(batch.getEvents()).thenAnswer(invocation -> resultSet); + when(batch.getCount()).thenAnswer(invocation -> resultSet.size()); + } +} From 0f7353f19c8a489aee5ec1c2437b20e3ef5ab676 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 19 Jul 2022 09:33:23 -0700 Subject: [PATCH 09/29] Rename to EventHubBufferedPartitionProducer --- ...ublisher.java => EventHubBufferedPartitionProducer.java} | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) rename sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/{EventHubPartitionPublisher.java => EventHubBufferedPartitionProducer.java} (95%) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionPublisher.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java similarity index 95% rename from sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionPublisher.java rename to sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java index e2623d914984..5af640dec6ab 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubPartitionPublisher.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java @@ -26,7 +26,7 @@ /** * Keeps track of publishing events to a partition. */ -class EventHubPartitionPublisher implements Closeable { +class EventHubBufferedPartitionProducer implements Closeable { private final ClientLogger logger; private final EventHubProducerAsyncClient client; private final String partitionId; @@ -37,7 +37,7 @@ class EventHubPartitionPublisher implements Closeable { private final Disposable publishSubscription; private final Sinks.Many eventSink; - EventHubPartitionPublisher(EventHubProducerAsyncClient client, String partitionId, + EventHubBufferedPartitionProducer(EventHubProducerAsyncClient client, String partitionId, BufferedProducerClientOptions options) { this.client = client; this.partitionId = partitionId; @@ -50,7 +50,7 @@ class EventHubPartitionPublisher implements Closeable { return Duration.ZERO; }, () -> Duration.ZERO); - this.logger = new ClientLogger(EventHubPartitionPublisher.class + "-" + partitionId); + this.logger = new ClientLogger(EventHubBufferedPartitionProducer.class + "-" + partitionId); final Supplier> queueSupplier = Queues.get(options.getMaxEventBufferLengthPerPartition()); this.eventSink = Sinks.many().unicast().onBackpressureBuffer(queueSupplier.get()); From 5175dab6dd68344da011739b8d05643989dacaf3 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 19 Jul 2022 10:05:23 -0700 Subject: [PATCH 10/29] Applying retryWhen policy to Aggregator. --- .../EventHubBufferedPartitionProducer.java | 64 +++++++++---------- 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java index 5af640dec6ab..14f5d4e2a7b5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java @@ -18,7 +18,6 @@ import reactor.util.retry.Retry; import java.io.Closeable; -import java.time.Duration; import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -33,9 +32,15 @@ class EventHubBufferedPartitionProducer implements Closeable { private final BufferedProducerClientOptions options; private final AmqpErrorContext errorContext; private final AtomicBoolean isClosed = new AtomicBoolean(false); - private final Mono linkSizeMono; private final Disposable publishSubscription; private final Sinks.Many eventSink; + private final Retry retryWhenPolicy = Retry.from(signal -> { + if (isClosed.get()) { + return Mono.empty(); + } else { + return Mono.just(true); + } + }); EventHubBufferedPartitionProducer(EventHubProducerAsyncClient client, String partitionId, BufferedProducerClientOptions options) { @@ -43,41 +48,37 @@ class EventHubBufferedPartitionProducer implements Closeable { this.partitionId = partitionId; this.options = options; this.errorContext = new AmqpErrorContext(client.getFullyQualifiedNamespace()); - this.linkSizeMono = client.createBatch().map(batch -> batch.getMaxSizeInBytes()) - .publishOn(Schedulers.boundedElastic()) - .cache(value -> Duration.ofMillis(Long.MAX_VALUE), - error -> { - return Duration.ZERO; - }, () -> Duration.ZERO); this.logger = new ClientLogger(EventHubBufferedPartitionProducer.class + "-" + partitionId); final Supplier> queueSupplier = Queues.get(options.getMaxEventBufferLengthPerPartition()); this.eventSink = Sinks.many().unicast().onBackpressureBuffer(queueSupplier.get()); - - final EventDataAggregator eventDataBatchFlux = new EventDataAggregator(eventSink.asFlux(), - () -> client.createBatch().subscribeOn(Schedulers.boundedElastic()).block(), - client.getFullyQualifiedNamespace(), options); + final Flux eventDataBatchFlux = Flux.defer(() -> { + return new EventDataAggregator(eventSink.asFlux(), + () -> { + return client.createBatch().subscribeOn(Schedulers.boundedElastic()).block(); + }, + client.getFullyQualifiedNamespace(), options); + }).retryWhen(retryWhenPolicy); this.publishSubscription = publishEvents(eventDataBatchFlux) .publishOn(Schedulers.boundedElastic()) .subscribe(result -> { - if (result.error == null) { - options.getSendSucceededContext().accept(new SendBatchSucceededContext(result.batch.getEvents(), - partitionId)); - } else { - options.getSendFailedContext().accept(new SendBatchFailedContext(result.batch.getEvents(), - partitionId, result.error)); - } - }, error -> { - logger.error("Publishing subscription completed and ended in an error.", error); - options.getSendFailedContext().accept(new SendBatchFailedContext(null, partitionId, error)); - }, - () -> { - logger.info("Publishing subscription completed."); - }); - + if (result.error == null) { + options.getSendSucceededContext().accept(new SendBatchSucceededContext(result.batch.getEvents(), + partitionId)); + } else { + options.getSendFailedContext().accept(new SendBatchFailedContext(result.batch.getEvents(), + partitionId, result.error)); + } + }, error -> { + logger.error("Publishing subscription completed and ended in an error.", error); + options.getSendFailedContext().accept(new SendBatchFailedContext(null, partitionId, error)); + }, + () -> { + logger.info("Publishing subscription completed."); + }); } Mono enqueueEvent(EventData eventData) { @@ -111,6 +112,7 @@ String getPartitionId() { * @return A stream of published results. */ Flux publishEvents(Flux upstream) { + return Flux.defer(() -> { return upstream.flatMap(batch -> { return client.send(batch).thenReturn(new PublishResult(batch, null)) @@ -118,13 +120,7 @@ Flux publishEvents(Flux upstream) { // so it doesn't stop publishing. .onErrorResume(error -> Mono.just(new PublishResult(null, error))); }); - }).retryWhen(Retry.from(signal -> { - if (isClosed.get()) { - return Mono.empty(); - } else { - return Mono.just(true); - } - })); + }).retryWhen(retryWhenPolicy); } @Override From 95c24e71eae57d6298566ecf7ef7beacbcbf5790 Mon Sep 17 00:00:00 2001 From: Connie Date: Tue, 19 Jul 2022 11:29:08 -0700 Subject: [PATCH 11/29] Remove unused EventData field --- .../com/azure/messaging/eventhubs/EventDataAggregatorTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java index 1a5f083f1cc1..1dc53ab98e52 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java @@ -45,7 +45,6 @@ public class EventDataAggregatorTest { private final EventData event1 = new EventData("foo"); private final EventData event2 = new EventData("bar"); private final EventData event3 = new EventData("baz"); - private final EventData event4 = new EventData("bart"); @BeforeEach public void beforeEach() { From 12c34e7840213934f52170b50eb97c6ffba63598 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 20 Jul 2022 10:21:28 -0700 Subject: [PATCH 12/29] Adding documentation to mock helper. --- .../eventhubs/EventDataAggregatorTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java index 1dc53ab98e52..60f03efaa9e3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java @@ -291,7 +291,15 @@ public void respectsBackpressure() { publisher.assertMaxRequested(request); } - private static void setupBatchMock(EventDataBatch batch, List resultSet, EventData... acceptedEvents) { + /** + * Helper method to set up mocked {@link EventDataBatch} to accept the given events in {@code resultSet}. + * + * @param batch Mocked batch. + * @param resultSet List to store added EventData in. + * @param acceptedEvents EventData that is accepted by the batch when {@link EventDataBatch#tryAdd(EventData)} + * is invoked. + */ + static void setupBatchMock(EventDataBatch batch, List resultSet, EventData... acceptedEvents) { when(batch.tryAdd(any(EventData.class))).thenAnswer(invocation -> { final EventData arg = invocation.getArgument(0); @@ -303,7 +311,9 @@ private static void setupBatchMock(EventDataBatch batch, List resultS return matches; }); - when(batch.getEvents()).thenAnswer(invocation -> resultSet); + when(batch.getEvents()).thenAnswer(invocation -> { + return resultSet; + }); when(batch.getCount()).thenAnswer(invocation -> resultSet.size()); } } From 27d8665e11cd3484662bfaf57d068113f5713ba5 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 20 Jul 2022 10:22:24 -0700 Subject: [PATCH 13/29] Hack around the blocking call. --- .../EventHubBufferedPartitionProducer.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java index 14f5d4e2a7b5..3b2c6b38c99a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java @@ -7,6 +7,7 @@ import com.azure.core.amqp.exception.AmqpException; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions; +import com.azure.messaging.eventhubs.models.CreateBatchOptions; import com.azure.messaging.eventhubs.models.SendBatchFailedContext; import com.azure.messaging.eventhubs.models.SendBatchSucceededContext; import reactor.core.Disposable; @@ -19,6 +20,7 @@ import java.io.Closeable; import java.util.Queue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -29,7 +31,6 @@ class EventHubBufferedPartitionProducer implements Closeable { private final ClientLogger logger; private final EventHubProducerAsyncClient client; private final String partitionId; - private final BufferedProducerClientOptions options; private final AmqpErrorContext errorContext; private final AtomicBoolean isClosed = new AtomicBoolean(false); private final Disposable publishSubscription; @@ -41,13 +42,14 @@ class EventHubBufferedPartitionProducer implements Closeable { return Mono.just(true); } }); + private final CreateBatchOptions createBatchOptions; EventHubBufferedPartitionProducer(EventHubProducerAsyncClient client, String partitionId, BufferedProducerClientOptions options) { this.client = client; this.partitionId = partitionId; - this.options = options; this.errorContext = new AmqpErrorContext(client.getFullyQualifiedNamespace()); + this.createBatchOptions = new CreateBatchOptions().setPartitionId(partitionId); this.logger = new ClientLogger(EventHubBufferedPartitionProducer.class + "-" + partitionId); @@ -57,7 +59,12 @@ class EventHubBufferedPartitionProducer implements Closeable { final Flux eventDataBatchFlux = Flux.defer(() -> { return new EventDataAggregator(eventSink.asFlux(), () -> { - return client.createBatch().subscribeOn(Schedulers.boundedElastic()).block(); + final Mono batch = client.createBatch(createBatchOptions); + try { + return batch.toFuture().get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Unexpected exception when getting batch.", e); + } }, client.getFullyQualifiedNamespace(), options); }).retryWhen(retryWhenPolicy); @@ -106,33 +113,33 @@ String getPartitionId() { return partitionId; } + @Override + public void close() { + if (isClosed.getAndSet(true)) { + return; + } + + publishSubscription.dispose(); + client.close(); + } + /** * Publishes {@link EventDataBatch} and returns the result. * * @return A stream of published results. */ - Flux publishEvents(Flux upstream) { + private Flux publishEvents(Flux upstream) { return Flux.defer(() -> { return upstream.flatMap(batch -> { return client.send(batch).thenReturn(new PublishResult(batch, null)) // Resuming on error because an error is a terminal signal, so we want to wrap that with a result, // so it doesn't stop publishing. - .onErrorResume(error -> Mono.just(new PublishResult(null, error))); + .onErrorResume(error -> Mono.just(new PublishResult(batch, error))); }); }).retryWhen(retryWhenPolicy); } - @Override - public void close() { - if (isClosed.getAndSet(true)) { - return; - } - - publishSubscription.dispose(); - client.close(); - } - /** * Static class to hold results. */ From e3299abf7e30604a80f160213469cf7c0fc1da41 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 20 Jul 2022 10:22:35 -0700 Subject: [PATCH 14/29] Adding tests for EventHubBufferedPartitionProducerTest --- ...EventHubBufferedPartitionProducerTest.java | 289 ++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java new file mode 100644 index 000000000000..ef1eb2206847 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java @@ -0,0 +1,289 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs; + +import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions; +import com.azure.messaging.eventhubs.models.CreateBatchOptions; +import com.azure.messaging.eventhubs.models.SendBatchFailedContext; +import com.azure.messaging.eventhubs.models.SendBatchSucceededContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import static com.azure.messaging.eventhubs.EventDataAggregatorTest.setupBatchMock; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +/** + * Tests {@link EventHubBufferedPartitionProducer} + */ +public class EventHubBufferedPartitionProducerTest { + private static final String PARTITION_ID = "10"; + private static final String NAMESPACE = "test-eventhubs-namespace"; + private static final String EVENT_HUB_NAME = "test-hub"; + private static final List PARTITION_IDS = Arrays.asList("one", "two", PARTITION_ID, "four"); + + private AutoCloseable mockCloseable; + + private final Semaphore successSemaphore = new Semaphore(1); + private final Semaphore failedSemaphore = new Semaphore(1); + private final EventData event1 = new EventData("foo"); + private final EventData event2 = new EventData("bar"); + private final EventData event3 = new EventData("baz"); + private final EventData event4 = new EventData("bart"); + + private final Queue returnedBatches = new LinkedList<>(); + + @Mock + private EventHubProducerAsyncClient client; + + @Mock + private EventDataBatch batch; + + @Mock + private EventDataBatch batch2; + + @Mock + private EventDataBatch batch3; + + @Mock + private EventDataBatch batch4; + + @Mock + private EventDataBatch batch5; + + @BeforeEach + public void beforeEach() { + mockCloseable = MockitoAnnotations.openMocks(this); + + returnedBatches.add(batch); + returnedBatches.add(batch2); + returnedBatches.add(batch3); + returnedBatches.add(batch4); + returnedBatches.add(batch5); + + when(client.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); + when(client.getEventHubName()).thenReturn(EVENT_HUB_NAME); + when(client.getPartitionIds()).thenReturn(Flux.fromIterable(PARTITION_IDS)); + + when(client.createBatch(any(CreateBatchOptions.class))).thenAnswer(invocation -> { + final EventDataBatch returned = returnedBatches.poll(); + assertNotNull(returned, "there should be more batches to be returned."); + return Mono.just(returned); + }); + } + + @AfterEach + public void afterEach() throws Exception { + if (mockCloseable != null) { + mockCloseable.close(); + } + + Mockito.framework().clearInlineMock(this); + } + + @Test + public void publishesEvents() throws InterruptedException { + // Arrange + successSemaphore.acquire(); + failedSemaphore.acquire(); + + final InvocationHolder holder = new InvocationHolder(); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(Duration.ofSeconds(5)); + options.setSendSucceededContext(holder::onSucceed); + options.setSendFailedContext(holder::onFailed); + + final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime()); + + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1, event2); + + when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty()); + + final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, + options); + + // Act & Assert + StepVerifier.create(producer.enqueueEvent(event1)) + .verifyComplete(); + + StepVerifier.create(producer.enqueueEvent(event2)) + .verifyComplete(); + + assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS), + "Should have been able to get a successful batch pushed downstream."); + + assertEquals(1, holder.succeededContexts.size()); + + final SendBatchSucceededContext first = holder.succeededContexts.get(0); + assertEquals(PARTITION_ID, first.getPartitionId()); + assertEquals(batchEvents, first.getEvents()); + + assertEquals(2, batchEvents.size()); + + assertTrue(holder.failedContexts.isEmpty()); + } + + @Test + public void publishesErrors() throws InterruptedException { + // Arrange + successSemaphore.acquire(); + failedSemaphore.acquire(); + + final InvocationHolder holder = new InvocationHolder(); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(Duration.ofSeconds(5)); + options.setSendSucceededContext(holder::onSucceed); + options.setSendFailedContext(holder::onFailed); + + final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime()); + + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1, event2); + + final List batchEvents2 = new ArrayList<>(); + setupBatchMock(batch2, batchEvents2, event3, event4); + + final Throwable error = new IllegalStateException("test-options."); + when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty(), Mono.error(error)); + + final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, + options); + + // Act & Assert + StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2))) + .verifyComplete(); + + StepVerifier.create(Mono.when(producer.enqueueEvent(event3))) + .thenAwait(options.getMaxWaitTime()) + .verifyComplete(); + + assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS), + "Should have been able to get a successful signal downstream."); + + assertTrue(failedSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS), + "Should have been able to get a successful error downstream."); + + assertEquals(1, holder.succeededContexts.size()); + + final SendBatchSucceededContext first = holder.succeededContexts.get(0); + assertEquals(PARTITION_ID, first.getPartitionId()); + assertEquals(batchEvents, first.getEvents()); + + assertEquals(2, batchEvents.size()); + + assertEquals(1, holder.failedContexts.size()); + } + + /** + * Checks that after an error publishing one batch, it can still publish subsequent batches successfully. + */ + @Test + public void canPublishAfterErrors() throws InterruptedException { + // Arrange + final CountDownLatch success = new CountDownLatch(2); + failedSemaphore.acquire(); + + final InvocationHolder holder = new InvocationHolder(); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(Duration.ofSeconds(5)); + options.setSendSucceededContext(context -> { + holder.onSucceed(context); + success.countDown(); + }); + options.setSendFailedContext(context -> holder.onFailed(context)); + + final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime()); + + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1, event2); + + final List batchEvents2 = new ArrayList<>(); + setupBatchMock(batch2, batchEvents2, event3); + + final List batchEvents3 = new ArrayList<>(); + final EventData event5 = new EventData("five"); + setupBatchMock(batch3, batchEvents3, event4, event5); + + final Throwable error = new IllegalStateException("test-options."); + + final Queue> responses = new LinkedList<>(); + responses.add(Mono.empty()); + responses.add(Mono.error(error)); + responses.add(Mono.empty()); + when(client.send(any(EventDataBatch.class))).thenAnswer(invocation -> { + return responses.poll(); + }); + + final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, + options); + + // Act & Assert + StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2))) + .verifyComplete(); + + StepVerifier.create(producer.enqueueEvent(event3)) + .thenAwait(options.getMaxWaitTime()) + .verifyComplete(); + + StepVerifier.create(Mono.when(producer.enqueueEvent(event4), producer.enqueueEvent(event5))) + .thenAwait(options.getMaxWaitTime()) + .verifyComplete(); + + assertTrue(success.await(waitTime.toMillis(), TimeUnit.MILLISECONDS), + "Should have been able to get a successful signal downstream."); + + assertTrue(failedSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS), + "Should have been able to get a successful error downstream."); + + assertEquals(2, holder.succeededContexts.size()); + + // Verify the completed ones. + final SendBatchSucceededContext first = holder.succeededContexts.get(0); + assertEquals(PARTITION_ID, first.getPartitionId()); + assertEquals(batchEvents, first.getEvents()); + + final SendBatchSucceededContext second = holder.succeededContexts.get(1); + assertEquals(PARTITION_ID, second.getPartitionId()); + assertEquals(batchEvents3, second.getEvents()); + + // Verify the failed ones. + assertEquals(1, holder.failedContexts.size()); + } + + private class InvocationHolder { + private final List succeededContexts = new ArrayList<>(); + private final List failedContexts = new ArrayList<>(); + + void onSucceed(SendBatchSucceededContext result) { + succeededContexts.add(result); + successSemaphore.release(); + } + + void onFailed(SendBatchFailedContext result) { + failedContexts.add(result); + failedSemaphore.release(); + } + } +} From b7e46011a8a39f85e77cdb1faa6360939a910806 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 20 Jul 2022 23:06:57 -0700 Subject: [PATCH 15/29] Populating PartitionProcessors for each partition id. --- .../EventHubBufferedProducerAsyncClient.java | 48 +++++++++++-------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java index b91926b203fe..a6c00ea851b5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.java @@ -15,8 +15,7 @@ import java.io.Closeable; import java.time.Duration; -import java.util.HashMap; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; /** @@ -53,31 +52,35 @@ @ServiceClient(builder = EventHubBufferedProducerClientBuilder.class, isAsync = true) public final class EventHubBufferedProducerAsyncClient implements Closeable { private final ClientLogger logger = new ClientLogger(EventHubBufferedProducerAsyncClient.class); - private final EventHubAsyncClient client; + private final EventHubProducerAsyncClient client; private final EventHubClientBuilder builder; private final BufferedProducerClientOptions clientOptions; + private final Mono initialisationMono; // Key: partitionId. - private final HashMap> partitionBatchMap = new HashMap<>(); - private final Mono initialisationMono; + private final ConcurrentHashMap partitionProducers = + new ConcurrentHashMap<>(); EventHubBufferedProducerAsyncClient(EventHubClientBuilder builder, BufferedProducerClientOptions clientOptions) { this.builder = builder; - this.client = builder.buildAsyncClient(); + this.client = builder.buildAsyncProducerClient(); this.clientOptions = clientOptions; - initialisationMono = Mono.using( + this.initialisationMono = Mono.using( () -> builder.buildAsyncClient(), - eventHubClient -> eventHubClient.getPartitionIds() - .handle((partitionId, sink) -> { - try { - partitionBatchMap.put(partitionId, new ConcurrentLinkedDeque<>()); - sink.complete(); - } catch (Exception e) { - sink.error(e); - } - }).then(), - eventHubClient -> eventHubClient.close()); + eventHubClient -> { + return eventHubClient.getPartitionIds() + .handle((partitionId, sink) -> { + try { + partitionProducers.put(partitionId, new EventHubBufferedPartitionProducer(client, + partitionId, clientOptions)); + sink.complete(); + } catch (Exception e) { + sink.error(e); + } + }).then(); + }, + eventHubClient -> eventHubClient.close()).cache(); } /** @@ -106,7 +109,7 @@ public String getEventHubName() { */ @ServiceMethod(returns = ReturnType.SINGLE) public Mono getEventHubProperties() { - return client.getProperties(); + return client.getEventHubProperties(); } /** @@ -141,7 +144,10 @@ public Mono getPartitionProperties(String partitionId) { * partitions. */ public int getBufferedEventCount() { - return 0; + return partitionProducers.values() + .parallelStream() + .mapToInt(producer -> producer.getBufferedEventCount()) + .sum(); } /** @@ -152,7 +158,9 @@ public int getBufferedEventCount() { * @return The number of events that are buffered and waiting to be published for a given partition. */ public int getBufferedEventCount(String partitionId) { - return 0; + final EventHubBufferedPartitionProducer producer = partitionProducers.get(partitionId); + + return producer != null ? producer.getBufferedEventCount() : 0; } /** From 6306048edb635534eb6e3abf1634a2b384e46406 Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 20 Jul 2022 23:07:20 -0700 Subject: [PATCH 16/29] Creating private static class to hold PublishResults from buffered producer. --- .../EventHubBufferedPartitionProducer.java | 83 +++++++++++++++---- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java index 3b2c6b38c99a..7cae1fcbd4fa 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java @@ -10,7 +10,9 @@ import com.azure.messaging.eventhubs.models.CreateBatchOptions; import com.azure.messaging.eventhubs.models.SendBatchFailedContext; import com.azure.messaging.eventhubs.models.SendBatchSucceededContext; +import org.reactivestreams.Subscription; import reactor.core.Disposable; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; @@ -22,6 +24,7 @@ import java.util.Queue; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; import java.util.function.Supplier; /** @@ -43,6 +46,7 @@ class EventHubBufferedPartitionProducer implements Closeable { } }); private final CreateBatchOptions createBatchOptions; + private final Queue eventQueue; EventHubBufferedPartitionProducer(EventHubProducerAsyncClient client, String partitionId, BufferedProducerClientOptions options) { @@ -54,7 +58,8 @@ class EventHubBufferedPartitionProducer implements Closeable { this.logger = new ClientLogger(EventHubBufferedPartitionProducer.class + "-" + partitionId); final Supplier> queueSupplier = Queues.get(options.getMaxEventBufferLengthPerPartition()); - this.eventSink = Sinks.many().unicast().onBackpressureBuffer(queueSupplier.get()); + this.eventQueue = queueSupplier.get(); + this.eventSink = Sinks.many().unicast().onBackpressureBuffer(eventQueue); final Flux eventDataBatchFlux = Flux.defer(() -> { return new EventDataAggregator(eventSink.asFlux(), @@ -69,23 +74,12 @@ class EventHubBufferedPartitionProducer implements Closeable { client.getFullyQualifiedNamespace(), options); }).retryWhen(retryWhenPolicy); + final PublishResultSubscriber publishResultSubscriber = new PublishResultSubscriber(partitionId, + options.getSendSucceededContext(), options.getSendFailedContext(), logger); + this.publishSubscription = publishEvents(eventDataBatchFlux) - .publishOn(Schedulers.boundedElastic()) - .subscribe(result -> { - if (result.error == null) { - options.getSendSucceededContext().accept(new SendBatchSucceededContext(result.batch.getEvents(), - partitionId)); - } else { - options.getSendFailedContext().accept(new SendBatchFailedContext(result.batch.getEvents(), - partitionId, result.error)); - } - }, error -> { - logger.error("Publishing subscription completed and ended in an error.", error); - options.getSendFailedContext().accept(new SendBatchFailedContext(null, partitionId, error)); - }, - () -> { - logger.info("Publishing subscription completed."); - }); + .publishOn(Schedulers.boundedElastic(), 1) + .subscribeWith(publishResultSubscriber); } Mono enqueueEvent(EventData eventData) { @@ -113,6 +107,15 @@ String getPartitionId() { return partitionId; } + /** + * Gets the number of events in queue. + * + * @return the number of events in the queue. + */ + int getBufferedEventCount() { + return eventQueue.size(); + } + @Override public void close() { if (isClosed.getAndSet(true)) { @@ -136,7 +139,7 @@ private Flux publishEvents(Flux upstream) { // Resuming on error because an error is a terminal signal, so we want to wrap that with a result, // so it doesn't stop publishing. .onErrorResume(error -> Mono.just(new PublishResult(batch, error))); - }); + }, 1, 1); }).retryWhen(retryWhenPolicy); } @@ -152,4 +155,48 @@ private static final class PublishResult { this.error = error; } } + + private static final class PublishResultSubscriber extends BaseSubscriber { + private static final long REQUEST = 1L; + private final String partitionId; + private final Consumer onSucceed; + private final Consumer onFailed; + private final ClientLogger logger; + + PublishResultSubscriber(String partitionId, Consumer onSucceed, + Consumer onFailed, ClientLogger logger) { + this.partitionId = partitionId; + this.onSucceed = onSucceed; + this.onFailed = onFailed; + this.logger = logger; + } + + @Override + protected void hookOnSubscribe(Subscription subscription) { + upstream().request(REQUEST); + } + + @Override + protected void hookOnNext(PublishResult result) { + if (result.error == null) { + onSucceed.accept(new SendBatchSucceededContext(result.batch.getEvents(), partitionId)); + } else { + onFailed.accept(new SendBatchFailedContext(result.batch.getEvents(), partitionId, result.error)); + } + + // Request one more PublishResult, which is equivalent to asking for another batch. + upstream().request(REQUEST); + } + + @Override + protected void hookOnError(Throwable throwable) { + logger.error("Publishing subscription completed and ended in an error.", throwable); + onFailed.accept(new SendBatchFailedContext(null, partitionId, throwable)); + } + + @Override + protected void hookOnComplete() { + logger.info("Publishing subscription completed."); + } + } } From 4c91c4a5b94e139d0c5dbdb4bef6b389b1a26d7d Mon Sep 17 00:00:00 2001 From: Connie Date: Wed, 20 Jul 2022 23:07:36 -0700 Subject: [PATCH 17/29] Adding tests to ensure upstream requests are respected. --- ...EventHubBufferedPartitionProducerTest.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java index ef1eb2206847..52585dd36f5f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java @@ -272,6 +272,71 @@ public void canPublishAfterErrors() throws InterruptedException { assertEquals(1, holder.failedContexts.size()); } + @Test + public void getBufferedEventCounts() throws InterruptedException { + // Arrange + final CountDownLatch success = new CountDownLatch(2); + failedSemaphore.acquire(); + + final InvocationHolder holder = new InvocationHolder(); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(Duration.ofSeconds(3)); + options.setSendSucceededContext(context -> { + System.out.println("Batch received."); + holder.onSucceed(context); + success.countDown(); + }); + options.setSendFailedContext(context -> holder.onFailed(context)); + + final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime()); + + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1); + + final List batchEvents2 = new ArrayList<>(); + setupBatchMock(batch2, batchEvents2, event2, event3); + + final List batchEvents3 = new ArrayList<>(); + final EventData event5 = new EventData("five"); + setupBatchMock(batch3, batchEvents3, event4, event5); + + // Delaying send operation. + when(client.send(any(EventDataBatch.class))).thenAnswer(invocation -> Mono.delay(options.getMaxWaitTime()).then()); + + final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, + options); + + // Act & Assert + StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2), producer.enqueueEvent(event3)), 1L) + .then(() -> { + // event1 was enqueued, event2 is in a batch, and event3 is currently in the queue waiting to be + // pushed downstream. + // batch1 (with event1) is being sent at the moment with the delay of options.getMaxWaitTime(), so the + // buffer doesn't drain so quickly. + final int bufferedEventCount = producer.getBufferedEventCount(); + assertEquals(1, bufferedEventCount); + }) + .verifyComplete(); + + StepVerifier.create(Mono.when(producer.enqueueEvent(event4), producer.enqueueEvent(event5))) + .verifyComplete(); + + final long totalTime = waitTime.toMillis() + waitTime.toMillis(); + assertTrue(success.await(totalTime, TimeUnit.MILLISECONDS), + "Should have been able to get a successful signal downstream."); + + assertEquals(2, holder.succeededContexts.size()); + + // Verify the completed ones. + final SendBatchSucceededContext first = holder.succeededContexts.get(0); + assertEquals(PARTITION_ID, first.getPartitionId()); + assertEquals(batchEvents, first.getEvents()); + + final SendBatchSucceededContext second = holder.succeededContexts.get(1); + assertEquals(PARTITION_ID, second.getPartitionId()); + assertEquals(batchEvents2, second.getEvents()); + } + private class InvocationHolder { private final List succeededContexts = new ArrayList<>(); private final List failedContexts = new ArrayList<>(); From 5b162855904aada83ebea832b5216a826f375105 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 01:00:24 -0700 Subject: [PATCH 18/29] Add suppressio for fallthrough --- .../com/azure/messaging/eventhubs/PartitionResolver.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java index 5947504c73a0..35ac4f49230f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java @@ -12,7 +12,7 @@ import java.util.concurrent.atomic.AtomicInteger; /** - * Allows events to be resolved to partitions using common patterns such as round-robin assignment and hashing of + * Allows events to be resolved to a partition using common patterns such as round-robin assignment and hashing of * partitions keys. */ class PartitionResolver { @@ -93,12 +93,15 @@ static short generateHashCode(String partitionKey) { * This implementation is a direct port of the Event Hubs service code; it is intended to match the gateway hashing * algorithm as closely as possible and should not be adjusted without careful consideration. * + * NOTE: Suppressing fallthrough warning for switch-case because we want it to fall into the subsequent cases. + * * @param data The data to base the hash on. * @param seed1 Seed value for the first hash. * @param seed2 Seed value for the second hash. * * @return An object containing the computed hash for {@code seed1} and {@code seed2}. */ + @SuppressWarnings("fallthrough") private static Hashed computeHash(byte[] data, int seed1, int seed2) { int a; int b; From 8b70e4f1c52292561115972301a9a91afbf19a11 Mon Sep 17 00:00:00 2001 From: Azure SDK Bot <53356347+azure-sdk@users.noreply.github.com> Date: Mon, 11 Jul 2022 10:42:44 -0700 Subject: [PATCH 19/29] Use --no-cone in pipeline sparse checkout script (#29905) Co-authored-by: Ben Broderick Phillips --- eng/common/pipelines/templates/steps/sparse-checkout.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/eng/common/pipelines/templates/steps/sparse-checkout.yml b/eng/common/pipelines/templates/steps/sparse-checkout.yml index 49f9eb553b04..a3b553b3a7a7 100644 --- a/eng/common/pipelines/templates/steps/sparse-checkout.yml +++ b/eng/common/pipelines/templates/steps/sparse-checkout.yml @@ -44,8 +44,10 @@ steps: Write-Host "git sparse-checkout init" git sparse-checkout init - Write-Host "git sparse-checkout set '/*' '!/*/' '/eng'" - git sparse-checkout set '/*' '!/*/' '/eng' + # Set non-cone mode otherwise path filters will not work in git >= 2.37.0 + # See https://github.blog/2022-06-27-highlights-from-git-2-37/#tidbits + Write-Host "git sparse-checkout set --no-cone '/*' '!/*/' '/eng'" + git sparse-checkout set --no-cone '/*' '!/*/' '/eng' } # Prevent wildcard expansion in Invoke-Expression (e.g. for checkout path '/*') From bcb504ad2ebee6898a75ba0ef83b72c2c57f6424 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 12:35:48 -0700 Subject: [PATCH 20/29] Fix warnings. --- .../com/azure/messaging/eventhubs/PartitionResolver.java | 8 +++++--- .../eventhubs/EventHubBufferedPartitionProducerTest.java | 1 + 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java index 35ac4f49230f..73cfca33c0ad 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java @@ -110,10 +110,12 @@ private static Hashed computeHash(byte[] data, int seed1, int seed2) { a = b = c = (0xdeadbeef + data.length + seed1); c += seed2; + final ByteBuffer buffer = ByteBuffer.allocate(data.length) - .put(data) - .flip() - .order(ByteOrder.LITTLE_ENDIAN); + .put(data); + + buffer.flip(); + buffer.order(ByteOrder.LITTLE_ENDIAN); int index = 0; int size = data.length; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java index 52585dd36f5f..72546d4e0f8f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java @@ -145,6 +145,7 @@ public void publishesEvents() throws InterruptedException { assertTrue(holder.failedContexts.isEmpty()); } + @SuppressWarnings("unchecked") @Test public void publishesErrors() throws InterruptedException { // Arrange From be3025ccbb41f9d042eead4c913fdcd9a01c3583 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 12:42:40 -0700 Subject: [PATCH 21/29] Removing final from inner classes. --- .../eventhubs/EventHubBufferedPartitionProducer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java index 7cae1fcbd4fa..2ed75c9cb933 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java @@ -146,7 +146,7 @@ private Flux publishEvents(Flux upstream) { /** * Static class to hold results. */ - private static final class PublishResult { + private static class PublishResult { private final EventDataBatch batch; private final Throwable error; @@ -156,7 +156,7 @@ private static final class PublishResult { } } - private static final class PublishResultSubscriber extends BaseSubscriber { + private static class PublishResultSubscriber extends BaseSubscriber { private static final long REQUEST = 1L; private final String partitionId; private final Consumer onSucceed; From be5514304de2eb87caa09a6e662b10cc0049a28c Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 12:42:56 -0700 Subject: [PATCH 22/29] Removing use of var. --- .../com/azure/messaging/eventhubs/PartitionResolverTests.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java index 82c3c4748029..c932b4c098b5 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java @@ -30,7 +30,7 @@ public class PartitionResolverTests { public static Stream> partitionSetTestCases() { final ArrayList> arguments = new ArrayList<>(); - for (var index = 1; index < 8; ++index) { + for (int index = 1; index < 8; ++index) { final List partitions = IntStream.range(0, index).mapToObj(String::valueOf) .collect(Collectors.toList()); @@ -106,7 +106,7 @@ public void distributesRoundRobinFairlyConcurrent(List partitionsList) { // Create a function that assigns partitions in a loop and track them. Mono roundRobin = Mono.fromRunnable(() -> { - for (var index = 0; index < iterationCount; index++) { + for (int index = 0; index < iterationCount; index++) { assigned.add(resolver.assignRoundRobin(partitions)); } }); From 19b84ec2ed89c26c4da4ed009262c3181f747368 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 14:14:29 -0700 Subject: [PATCH 23/29] Add javadocs. --- .../azure/messaging/eventhubs/PartitionResolverTests.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java index c932b4c098b5..9cb351a9a7eb 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java @@ -73,6 +73,11 @@ public static Stream partitionHashTestCases() { return arguments.stream(); } + /** + * Tests that events are distributed equally given the set of partitions. + * + * @param partitionsList List of partitions to distribute all events between. + */ @ParameterizedTest @MethodSource("partitionSetTestCases") public void distributesRoundRobinFairly(List partitionsList) { From 4ff66e5aadb683d3c396375ef3f61d2edfc12cb9 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 14:14:48 -0700 Subject: [PATCH 24/29] Add default case. --- .../java/com/azure/messaging/eventhubs/PartitionResolver.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java index 73cfca33c0ad..4f8d1e902160 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java @@ -187,6 +187,8 @@ private static Hashed computeHash(byte[] data, int seed1, int seed2) { break; case 0: return new Hashed(c, b); + default: + break; } c ^= b; From 285c51723817a6583fb6fa016ace0098f5742940 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 14:33:27 -0700 Subject: [PATCH 25/29] Adding comments for cases meant to fallthrough --- .../eventhubs/EventDataAggregator.java | 3 ++- .../eventhubs/PartitionResolver.java | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java index 94eb5ca57409..2cff332bd74d 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -64,7 +64,8 @@ public void subscribe(CoreSubscriber actual) { batchSupplier, LOGGER); if (!downstreamSubscription.compareAndSet(null, subscription)) { - throw new IllegalArgumentException("Cannot resubscribe to multiple upstreams."); + throw LOGGER.logThrowableAsError(new IllegalArgumentException( + "Cannot resubscribe to multiple upstreams.")); } source.subscribe(subscription); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java index 4f8d1e902160..25430ed8bae8 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java @@ -103,13 +103,9 @@ static short generateHashCode(String partitionKey) { */ @SuppressWarnings("fallthrough") private static Hashed computeHash(byte[] data, int seed1, int seed2) { - int a; - int b; - int c; - - a = b = c = (0xdeadbeef + data.length + seed1); - c += seed2; - + int a = (0xdeadbeef + data.length + seed1); + int b = a; + int c = a + seed2; final ByteBuffer buffer = ByteBuffer.allocate(data.length) .put(data); @@ -159,27 +155,38 @@ private static Hashed computeHash(byte[] data, int seed1, int seed2) { b += buffer.getInt(index + 4); c += buffer.getInt(index + 8); break; + + // fallthrough case 11: c += data[index + 10] << 16; + // fallthrough case 10: c += data[index + 9] << 8; + // fallthrough case 9: c += data[index + 8]; case 8: b += buffer.getInt(index + 4); a += buffer.getInt(index); break; + + // fallthrough case 7: b += data[index + 6] << 16; + // fallthrough case 6: b += data[index + 5] << 8; + // fallthrough case 5: b += data[index + 4]; case 4: a += buffer.getInt(index); break; + + // fallthrough case 3: a += data[index + 2] << 16; + // fallthrough case 2: a += data[index + 1] << 8; case 1: From a3e291ea9540372ec99ec7ff6e2100317455d137 Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 21:11:09 -0700 Subject: [PATCH 26/29] Fixing build breaks. --- .../java/com/azure/messaging/eventhubs/PartitionResolver.java | 3 +++ .../com/azure/messaging/eventhubs/PartitionResolverTests.java | 2 ++ 2 files changed, 5 insertions(+) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java index 25430ed8bae8..04a327bfcd77 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionResolver.java @@ -165,6 +165,7 @@ private static Hashed computeHash(byte[] data, int seed1, int seed2) { // fallthrough case 9: c += data[index + 8]; + // fallthrough case 8: b += buffer.getInt(index + 4); a += buffer.getInt(index); @@ -179,6 +180,7 @@ private static Hashed computeHash(byte[] data, int seed1, int seed2) { // fallthrough case 5: b += data[index + 4]; + // fallthrough case 4: a += buffer.getInt(index); break; @@ -189,6 +191,7 @@ private static Hashed computeHash(byte[] data, int seed1, int seed2) { // fallthrough case 2: a += data[index + 1] << 8; + // fallthrough case 1: a += data[index]; break; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java index 9cb351a9a7eb..6c731d2bb858 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/samples/java/com/azure/messaging/eventhubs/PartitionResolverTests.java @@ -153,6 +153,8 @@ public void distributesRoundRobinFairlyConcurrent(List partitionsList) { /** * Verifies that the same partition key is assigned to the same partition id. + * + * @param partitionsList List of partitions. */ @ParameterizedTest @MethodSource("partitionSetTestCases") From 246d9d924d4952ae7580c33798365d501589da7e Mon Sep 17 00:00:00 2001 From: Connie Date: Thu, 21 Jul 2022 21:29:26 -0700 Subject: [PATCH 27/29] Adding an exclusion for spotbugs because fallthrough is intentional. --- .../src/main/resources/spotbugs/spotbugs-exclude.xml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml index 969b301ee7bd..4a1822f1e1fe 100755 --- a/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml +++ b/eng/code-quality-reports/src/main/resources/spotbugs/spotbugs-exclude.xml @@ -2751,4 +2751,11 @@ + + + + + + + From 30a0b62590eb497003e87093e29248e8ed3d2655 Mon Sep 17 00:00:00 2001 From: Connie Date: Sat, 23 Jul 2022 23:45:55 -0700 Subject: [PATCH 28/29] Wrap synchronous lock. --- .../messaging/eventhubs/EventDataAggregator.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java index 2cff332bd74d..c4270737b89a 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -201,21 +201,20 @@ private void updateOrPublishBatch(EventData eventData, boolean alwaysPublish) { publishDownstream(); return; } else if (eventData == null) { + // EventData will be null in the case when options.maxWaitTime() has elapsed and we want to push the + // batch downstream. return; } boolean added; synchronized (lock) { added = currentBatch.tryAdd(eventData); - } - - if (added) { - return; - } - publishDownstream(); + if (added) { + return; + } - synchronized (lock) { + publishDownstream(); added = currentBatch.tryAdd(eventData); } From 54df3ec378b661ca81b5dffc43103aedb7dc4a2e Mon Sep 17 00:00:00 2001 From: Connie Date: Sun, 24 Jul 2022 00:31:19 -0700 Subject: [PATCH 29/29] Adding documentation to EventDataAggregatora --- .../com/azure/messaging/eventhubs/EventDataAggregator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java index c4270737b89a..da0448cfc2f6 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -162,8 +162,12 @@ public void onSubscribe(Subscription s) { @Override public void onNext(EventData eventData) { updateOrPublishBatch(eventData, false); + eventSink.emitNext(1L, Sinks.EmitFailureHandler.FAIL_FAST); + // When an EventDataBatch is pushed downstream, we decrement REQUESTED. However, if REQUESTED is still > 0, + // that means we did not publish the EventDataBatch (ie. because it was not full). We request another + // EventData upstream to try and fill this EventDataBatch and push it downstream. final long left = REQUESTED.get(this); if (left > 0) { subscription.request(1L); @@ -255,7 +259,7 @@ private void publishDownstream() { } }); - logger.verbose("Batch published. Requested batches left: {}", batchesLeft); + logger.verbose(previous + ": Batch published. Requested batches left: {}", batchesLeft); if (!isCompleted.get()) { this.currentBatch = batchSupplier.get();