Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
807e69d
Adds flush to PartitionProcessor.
conniey Jul 21, 2022
054069a
Adds implementation to methods for async client.
conniey Jul 21, 2022
2e77e56
Fixes build breaks.
conniey Jul 21, 2022
63172dd
Remove idempotent retries for next release.
conniey Jul 22, 2022
fc359e1
Connect asynchronous client.
conniey Jul 22, 2022
ff496f6
Closing producer client after test case.
conniey Jul 22, 2022
f33f83f
Make default retry options package-private.
conniey Jul 24, 2022
c98b53c
Adding integration test for BufferedProducerAsyncClient.
conniey Jul 24, 2022
27551af
Adding partition id to EventDataAggregator
conniey Jul 24, 2022
ab0b80e
Adding defaults to builder and documentation.
conniey Jul 25, 2022
bfeff3a
Remove localizedBy usages.
conniey Jul 25, 2022
048d79a
Fixing switchMap logic.
conniey Jul 25, 2022
7123317
(WIP) Have PartitionProducer throw an UncheckedInterruptedException w…
conniey Jul 25, 2022
3831d70
Update PublishResultSubscriber to clear remaining queue items when cl…
conniey Jul 25, 2022
dbe977a
Delay computation of EventDataAggregator to prevent multiple instance…
conniey Jul 25, 2022
5068334
Adding support for flush().
conniey Jul 28, 2022
3fb89d3
Add emitResult constant.
conniey Jul 29, 2022
e8f1443
Complete EventDataAggregator when it is cancelled.
conniey Jul 29, 2022
02b68c0
Implement flush and clean up logging.
conniey Jul 29, 2022
46ca2a7
Fix spot bugs issues.
conniey Jul 29, 2022
eb84f1b
Add implementation for sync-client.
conniey Jul 29, 2022
4a246a3
Making non-implemented methods package-private.
conniey Jul 29, 2022
51c717c
Fix build break.
conniey Jul 29, 2022
5ba6f79
Add changelog entry.
conniey Jul 29, 2022
06f3dd3
Remove unused formatter.
conniey Jul 29, 2022
89fbee6
Updating timeout interval.
conniey Jul 29, 2022
4ae762e
Rename variable to isFlushing.
conniey Jul 29, 2022
d76460c
Remove else-if
conniey Jul 29, 2022
9454eae
Adding documentation.
conniey Aug 1, 2022
6d942bc
Update exceptions.
conniey Aug 1, 2022
2c25e0d
Clean up failing tests.
conniey Aug 1, 2022
92af5fd
Fixing test failure
conniey Aug 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@

### Features Added

- Added algorithm for mapping partition keys to partition ids.
- Added identifier to client. ([#22981](https://github.com/Azure/azure-sdk-for-java/issues/22981))
- Adds algorithm for mapping partition keys to partition ids.
- Added EventHubBufferedProducerAsyncClient and EventHubBufferedProducerClient

### Breaking Changes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;

import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.PARTITION_ID_KEY;

/**
* Aggregates {@link EventData} into {@link EventDataBatch} and pushes them downstream when:
*
Expand All @@ -33,20 +34,23 @@
*/
class EventDataAggregator extends FluxOperator<EventData, EventDataBatch> {
private static final ClientLogger LOGGER = new ClientLogger(EventDataAggregator.class);

private final AtomicReference<EventDataAggregatorMain> downstreamSubscription = new AtomicReference<>();
private final Supplier<EventDataBatch> batchSupplier;
private final String namespace;
private final BufferedProducerClientOptions options;
private final String partitionId;

/**
* Build a {@link FluxOperator} wrapper around the passed parent {@link Publisher}
*
* @param source the {@link Publisher} to decorate
*/
EventDataAggregator(Flux<? extends EventData> source, Supplier<EventDataBatch> batchSupplier,
String namespace, BufferedProducerClientOptions options) {
String namespace, BufferedProducerClientOptions options, String partitionId) {
super(source);

this.partitionId = partitionId;
this.batchSupplier = batchSupplier;
this.namespace = namespace;
this.options = options;
Expand All @@ -61,7 +65,7 @@ class EventDataAggregator extends FluxOperator<EventData, EventDataBatch> {
@Override
public void subscribe(CoreSubscriber<? super EventDataBatch> actual) {
final EventDataAggregatorMain subscription = new EventDataAggregatorMain(actual, namespace, options,
batchSupplier, LOGGER);
batchSupplier, partitionId, LOGGER);

if (!downstreamSubscription.compareAndSet(null, subscription)) {
throw LOGGER.logThrowableAsError(new IllegalArgumentException(
Expand All @@ -82,13 +86,13 @@ static class EventDataAggregatorMain implements Subscription, CoreSubscriber<Eve
private static final AtomicLongFieldUpdater<EventDataAggregatorMain> REQUESTED =
AtomicLongFieldUpdater.newUpdater(EventDataAggregatorMain.class, "requested");

private static final Duration MAX_TIME = Duration.ofMillis(Long.MAX_VALUE);

private final Sinks.Many<Long> eventSink;
private final Disposable disposable;

private final AtomicBoolean isCompleted = new AtomicBoolean(false);
private final CoreSubscriber<? super EventDataBatch> downstream;

private final String partitionId;
private final ClientLogger logger;
private final Supplier<EventDataBatch> batchSupplier;
private final String namespace;
Expand All @@ -97,25 +101,25 @@ static class EventDataAggregatorMain implements Subscription, CoreSubscriber<Eve
private Subscription subscription;
private EventDataBatch currentBatch;

private volatile Throwable lastError;

EventDataAggregatorMain(CoreSubscriber<? super EventDataBatch> downstream, String namespace,
BufferedProducerClientOptions options, Supplier<EventDataBatch> batchSupplier, ClientLogger logger) {
BufferedProducerClientOptions options, Supplier<EventDataBatch> batchSupplier, String partitionId,
ClientLogger logger) {
this.namespace = namespace;
this.downstream = downstream;
this.partitionId = partitionId;
this.logger = logger;
this.batchSupplier = batchSupplier;
this.currentBatch = batchSupplier.get();

this.eventSink = Sinks.many().unicast().onBackpressureError();
this.disposable = eventSink.asFlux()
.switchMap(value -> {
if (value == 0) {
return Flux.interval(MAX_TIME, MAX_TIME);
} else {
return Flux.interval(options.getMaxWaitTime(), options.getMaxWaitTime());
}
})
.subscribe(next -> {
logger.verbose("Time elapsed. Publishing batch.");
this.disposable = Flux.switchOnNext(eventSink.asFlux().map(e -> Flux.interval(options.getMaxWaitTime())
.takeUntil(index -> isCompleted.get())))
.subscribe(index -> {
logger.atVerbose()
.addKeyValue(PARTITION_ID_KEY, partitionId)
.log("Time elapsed. Attempt to publish downstream.");
updateOrPublishBatch(null, true);
});
}
Expand All @@ -140,7 +144,14 @@ public void request(long n) {
*/
@Override
public void cancel() {
if (!isCompleted.compareAndSet(false, true)) {
return;
}

// Do not keep requesting more events upstream
logger.atVerbose()
.addKeyValue(PARTITION_ID_KEY, partitionId)
.log("Disposing of aggregator.");
subscription.cancel();

updateOrPublishBatch(null, true);
Expand All @@ -166,7 +177,7 @@ public void onNext(EventData eventData) {
eventSink.emitNext(1L, Sinks.EmitFailureHandler.FAIL_FAST);

// When an EventDataBatch is pushed downstream, we decrement REQUESTED. However, if REQUESTED is still > 0,
// that means we did not publish the EventDataBatch (ie. because it was not full). We request another
// that means we did not publish the EventDataBatch (i.e. because it was not full). We request another
// EventData upstream to try and fill this EventDataBatch and push it downstream.
final long left = REQUESTED.get(this);
if (left > 0) {
Expand Down Expand Up @@ -268,6 +279,22 @@ private void publishDownstream() {
this.currentBatch = null;
}
}
} catch (EventHubBufferedPartitionProducer.UncheckedInterruptedException exception) {
logger.info("An exception occurred while trying to get a new batch.", exception);

if (this.lastError != null) {
logger.info("Exception has been set already, terminating EventDataAggregator.");

final Throwable error = Operators.onNextError(previous, exception, downstream.currentContext(),
subscription);

if (error != null) {
onError(error);
}
} else {
this.lastError = exception;
}

} catch (Throwable e) {
final Throwable error = Operators.onNextError(previous, e, downstream.currentContext(), subscription);

Expand Down
Loading