Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
1a69f96
Updating property name to getMaxEventBufferLengthPerPartition
conniey Jul 18, 2022
665de4c
Adds an aggregator that publishes events based on full batches and time.
conniey Jul 18, 2022
638a736
Adding implementation for publishing events for a partition.
conniey Jul 18, 2022
bde5331
Adding license info.
conniey Jul 18, 2022
6f42173
Adding inline mock maker for final classes.
conniey Jul 18, 2022
7c12be1
Fixing recursive subscription.
conniey Jul 18, 2022
3d8ffff
Do not get additional batches if the current instance is completed.
conniey Jul 19, 2022
b6b2149
Adding tests.
conniey Jul 19, 2022
0f7353f
Rename to EventHubBufferedPartitionProducer
conniey Jul 19, 2022
5175dab
Applying retryWhen policy to Aggregator.
conniey Jul 19, 2022
95c24e7
Remove unused EventData field
conniey Jul 19, 2022
12c34e7
Adding documentation to mock helper.
conniey Jul 20, 2022
27d8665
Hack around the blocking call.
conniey Jul 20, 2022
e3299ab
Adding tests for EventHubBufferedPartitionProducerTest
conniey Jul 20, 2022
b7e4601
Populating PartitionProcessors for each partition id.
conniey Jul 21, 2022
6306048
Creating private static class to hold PublishResults from buffered pr…
conniey Jul 21, 2022
4c91c4a
Adding tests to ensure upstream requests are respected.
conniey Jul 21, 2022
5b16285
Add suppressio for fallthrough
conniey Jul 21, 2022
8b70e4f
Use --no-cone in pipeline sparse checkout script (#29905)
azure-sdk Jul 11, 2022
bcb504a
Fix warnings.
conniey Jul 21, 2022
be3025c
Removing final from inner classes.
conniey Jul 21, 2022
be55143
Removing use of var.
conniey Jul 21, 2022
19b84ec
Add javadocs.
conniey Jul 21, 2022
4ff66e5
Add default case.
conniey Jul 21, 2022
285c517
Adding comments for cases meant to fallthrough
conniey Jul 21, 2022
a3e291e
Fixing build breaks.
conniey Jul 22, 2022
246d9d9
Adding an exclusion for spotbugs because fallthrough is intentional.
conniey Jul 22, 2022
30a0b62
Wrap synchronous lock.
conniey Jul 24, 2022
54df3ec
Adding documentation to EventDataAggregatora
conniey Jul 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2751,4 +2751,11 @@
<Method name="getRecognizePiiEntitiesResponse"/>
<Bug pattern="BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"/>
</Match>

<!-- This is intentional when calculating the Jenkins' lookup3 hash. -->
<Match>
<Class name="com.azure.messaging.eventhubs.PartitionResolver"/>
<Method name="computeHash"/>
<Bug pattern="SF_SWITCH_FALLTHROUGH"/>
</Match>
</FindBugsFilter>
6 changes: 4 additions & 2 deletions eng/common/pipelines/templates/steps/sparse-checkout.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,10 @@ steps:
Write-Host "git sparse-checkout init"
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for that breaking change in eng so it builds.

git sparse-checkout init

Write-Host "git sparse-checkout set '/*' '!/*/' '/eng'"
git sparse-checkout set '/*' '!/*/' '/eng'
# Set non-cone mode otherwise path filters will not work in git >= 2.37.0
# See https://github.blog/2022-06-27-highlights-from-git-2-37/#tidbits
Write-Host "git sparse-checkout set --no-cone '/*' '!/*/' '/eng'"
git sparse-checkout set --no-cone '/*' '!/*/' '/eng'
}

# Prevent wildcard expansion in Invoke-Expression (e.g. for checkout path '/*')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventHubBufferedProducerAsyncClient.BufferedProducerClientOptions;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Aggregates {@link EventData} into {@link EventDataBatch} and pushes them downstream when:
*
* <ul>
* <li>{@link BufferedProducerClientOptions#getMaxWaitTime()} elapses between events.</li>
* <li>{@link EventDataBatch} cannot hold any more events.</li>
* </ul>
*/
class EventDataAggregator extends FluxOperator<EventData, EventDataBatch> {
private static final ClientLogger LOGGER = new ClientLogger(EventDataAggregator.class);
private final AtomicReference<EventDataAggregatorMain> downstreamSubscription = new AtomicReference<>();
private final Supplier<EventDataBatch> batchSupplier;
private final String namespace;
private final BufferedProducerClientOptions options;

/**
* Build a {@link FluxOperator} wrapper around the passed parent {@link Publisher}
*
* @param source the {@link Publisher} to decorate
*/
EventDataAggregator(Flux<? extends EventData> source, Supplier<EventDataBatch> batchSupplier,
String namespace, BufferedProducerClientOptions options) {
super(source);

this.batchSupplier = batchSupplier;
this.namespace = namespace;
this.options = options;
}

/**
* Subscribes to events from this operator. Downstream subscribers invoke this method and subscribe to events from
* it.
*
* @param actual Downstream subscriber.
*/
@Override
public void subscribe(CoreSubscriber<? super EventDataBatch> actual) {
final EventDataAggregatorMain subscription = new EventDataAggregatorMain(actual, namespace, options,
batchSupplier, LOGGER);

if (!downstreamSubscription.compareAndSet(null, subscription)) {
throw LOGGER.logThrowableAsError(new IllegalArgumentException(
"Cannot resubscribe to multiple upstreams."));
}

source.subscribe(subscription);
}

/**
* Main implementation class for subscribing to the upstream source and publishing events downstream.
*/
static class EventDataAggregatorMain implements Subscription, CoreSubscriber<EventData> {
/**
* The number of requested EventDataBatches.
*/
private volatile long requested;
private static final AtomicLongFieldUpdater<EventDataAggregatorMain> REQUESTED =
AtomicLongFieldUpdater.newUpdater(EventDataAggregatorMain.class, "requested");

private static final Duration MAX_TIME = Duration.ofMillis(Long.MAX_VALUE);

private final Sinks.Many<Long> eventSink;
private final Disposable disposable;

private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private final CoreSubscriber<? super EventDataBatch> downstream;
private final ClientLogger logger;
private final Supplier<EventDataBatch> batchSupplier;
private final String namespace;
private final Object lock = new Object();

private Subscription subscription;
private EventDataBatch currentBatch;

EventDataAggregatorMain(CoreSubscriber<? super EventDataBatch> downstream, String namespace,
BufferedProducerClientOptions options, Supplier<EventDataBatch> batchSupplier, ClientLogger logger) {
this.namespace = namespace;
this.downstream = downstream;
this.logger = logger;
this.batchSupplier = batchSupplier;
this.currentBatch = batchSupplier.get();

this.eventSink = Sinks.many().unicast().onBackpressureError();
this.disposable = eventSink.asFlux()
.switchMap(value -> {
if (value == 0) {
return Flux.interval(MAX_TIME, MAX_TIME);
} else {
return Flux.interval(options.getMaxWaitTime(), options.getMaxWaitTime());
}
})
.subscribe(next -> {
logger.verbose("Time elapsed. Publishing batch.");
updateOrPublishBatch(null, true);
});
}

/**
* Request a number of {@link EventDataBatch}.
*
* @param n Number of batches requested.
*/
@Override
public void request(long n) {
if (!Operators.validate(n)) {
return;
}

Operators.addCap(REQUESTED, this, n);
subscription.request(n);
}

/**
* Cancels the subscription upstream.
*/
@Override
public void cancel() {
// Do not keep requesting more events upstream
subscription.cancel();

updateOrPublishBatch(null, true);
downstream.onComplete();
disposable.dispose();
}

@Override
public void onSubscribe(Subscription s) {
if (subscription != null) {
logger.warning("Subscription was already set. Cancelling existing subscription.");
subscription.cancel();
} else {
subscription = s;
downstream.onSubscribe(this);
}
}

@Override
public void onNext(EventData eventData) {
updateOrPublishBatch(eventData, false);

eventSink.emitNext(1L, Sinks.EmitFailureHandler.FAIL_FAST);

// When an EventDataBatch is pushed downstream, we decrement REQUESTED. However, if REQUESTED is still > 0,
// that means we did not publish the EventDataBatch (ie. because it was not full). We request another
// EventData upstream to try and fill this EventDataBatch and push it downstream.
final long left = REQUESTED.get(this);
if (left > 0) {
subscription.request(1L);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have subscription.request(n) when downstream request, do we still need to request(1L) here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I removed it from this method.

}
}

@Override
public void onError(Throwable t) {
if (!isCompleted.compareAndSet(false, true)) {
Operators.onErrorDropped(t, downstream.currentContext());
return;
}

updateOrPublishBatch(null, true);
downstream.onError(t);
}

/**
* Upstream signals a completion.
*/
@Override
public void onComplete() {
if (isCompleted.compareAndSet(false, true)) {
updateOrPublishBatch(null, true);
downstream.onComplete();
}
}

/**
* @param eventData EventData to add to or null if there are no events to add to the batch.
* @param alwaysPublish {@code true} to always push batch downstream. {@code false}, otherwise.
*/
private void updateOrPublishBatch(EventData eventData, boolean alwaysPublish) {
if (alwaysPublish) {
publishDownstream();
return;
} else if (eventData == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add some log when eventData is null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment since it is a normal path after maxWaitTime has elapsed.

// EventData will be null in the case when options.maxWaitTime() has elapsed and we want to push the
// batch downstream.
return;
}

boolean added;
synchronized (lock) {
added = currentBatch.tryAdd(eventData);

if (added) {
return;
}

publishDownstream();
added = currentBatch.tryAdd(eventData);
}

if (!added) {
final AmqpException error = new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED,
"EventData exceeded maximum size.", new AmqpErrorContext(namespace));

onError(error);
}
}

/**
* Publishes batch downstream if there are events in the batch and updates it.
*/
private void publishDownstream() {
EventDataBatch previous = null;

try {
synchronized (lock) {
previous = this.currentBatch;

if (previous == null) {
logger.warning("Batch should not be null, setting a new batch.");

this.currentBatch = batchSupplier.get();
return;
} else if (previous.getEvents().isEmpty()) {
return;
}

downstream.onNext(previous);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need double check if downstream have other locks or may take a long time to avoid deadlock.
I check the producer send method, so far looks good.


final long batchesLeft = REQUESTED.updateAndGet(this, (v) -> {
if (v == Long.MAX_VALUE) {
return v;
} else {
return v - 1;
}
});

logger.verbose(previous + ": Batch published. Requested batches left: {}", batchesLeft);

if (!isCompleted.get()) {
this.currentBatch = batchSupplier.get();
} else {
logger.verbose("Aggregator is completed. Not setting another batch.");
this.currentBatch = null;
}
}
} catch (Throwable e) {
final Throwable error = Operators.onNextError(previous, e, downstream.currentContext(), subscription);

logger.warning("Unable to push batch downstream to publish.", error);

if (error != null) {
onError(error);
}
}
}
}
}
Loading