Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@

### Bugs Fixed

- Introducing ReactorShim to proxy certain reactive operations to appropriate Reactor operators, these are the operations for which recent Reactor versions have more optimized operators compared to an older version, or same operators with breaking change across Reactor versions
- When available, using the backpressure aware windowTimeout operator through ReactorShim. ([23950](https://github.com/Azure/azure-sdk-for-java/issues/23950))

### Other Changes

## 5.12.2 (2022-07-07)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.PartitionProcessorException;
import com.azure.messaging.eventhubs.implementation.ReactorShim;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
Expand Down Expand Up @@ -244,8 +245,7 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
});

if (maxWaitTime != null) {
partitionEventFlux = receiver
.windowTimeout(maxBatchSize, maxWaitTime);
partitionEventFlux = ReactorShim.windowTimeout(receiver, maxBatchSize, maxWaitTime);
} else {
partitionEventFlux = receiver
.window(maxBatchSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.implementation;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.time.Duration;

/**
* A shim that proxies Reactor operator calls, for example, if the loaded Reactor has an optimized variant
* of a standard operator, then shim uses it else fallback to the standard variant, if there are breaking
* changes in operators among Reactor versions that SDK supports, then shim may expose a unified API for
* those operators.
*/
public final class ReactorShim {
private static final ClientLogger LOGGER = new ClientLogger(ReactorShim.class);

/* Reactor Operator names */
private static final String WINDOW_TIMEOUT_OPERATOR = "windowTimeout";
/* Reactor Operator handles */
private static final MethodHandle BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE;

static {
BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE = lookupBackpressureWindowTimeoutOperator();
}

/**
* Split the {@code source} {@link Flux} sequence into multiple {@link Flux} windows containing
* {@code maxSize} elements (or less for the final window) and starting from the first item.
* Each {@link Flux} window will onComplete once it contains {@code maxSize} elements
* OR it has been open for the given {@link Duration} (as measured on the {@link Schedulers#parallel() parallel}
* Scheduler).
*
* <p>
* If the loaded Reactor library has a backpressure-aware window-timeout operator then it will be used,
* which caps requests to the source by {@code maxSize} (i.e. prefetch), otherwise, the regular variant
* of window-timeout operator requesting unbounded demand will be used.
*
* @param maxSize the maximum number of items to emit in the window before closing it
* @param maxTime the maximum {@link Duration} since the window was opened before closing it
*
* @param <T> the element type of the source {@link Flux}.
* @return a {@link Flux} of {@link Flux} windows based on element count and duration.
*/
public static <T> Flux<Flux<T>> windowTimeout(Flux<T> source, int maxSize, Duration maxTime) {
if (BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE == null) {
// optimized (backpressure) aware windowTimeout operator not available use standard variant.
return source.windowTimeout(maxSize, maxTime);
}

try {
return ((Flux<Flux<T>>) (BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE.invoke(source, maxSize, maxTime, true)));
} catch (Throwable err) {
// 'java.lang.invoke' throws Throwable. Given 'Error' category represents a serious
// abnormal thread state throw it immediately else throw via standard azure-core Logger.
if (err instanceof Error) {
throw (Error) err;
} else if (err instanceof RuntimeException) {
throw LOGGER.logExceptionAsError((RuntimeException) err);
} else {
throw LOGGER.logExceptionAsError(new RuntimeException(err));
}
}
}

/**
* Try to obtain {@link MethodHandle} for backpressure aware windowTimeout Reactor operator.
*
* @return if the backpressure aware windowTimeout Reactor operator is available then return
* operator {@link MethodHandle} else null.
*/
private static MethodHandle lookupBackpressureWindowTimeoutOperator() {
try {
return MethodHandles.publicLookup().findVirtual(Flux.class, WINDOW_TIMEOUT_OPERATOR,
MethodType.methodType(Flux.class, int.class, Duration.class, boolean.class));
} catch (IllegalAccessException | NoSuchMethodException err) {
LOGGER.verbose("Failed to retrieve MethodHandle for backpressure aware windowTimeout Reactor operator.", err);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it is worthwhile letting the user who might be reading this log know that it isn't a catastrophic outcome? I.e. by adding something like " - falling back to non-backpressure aware operator instead. It is recommended you consider upgrading your Reactor dependency if possible to at least version X.Y.Z"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll second this, there are a few warning messages like this in azure-core that have been flagged as issues

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// @lmolkova (in case you missed this comment)

}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.implementation;

import org.junit.jupiter.api.Test;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.LongStream;

import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

/**
* Tests for {@link ReactorShim}.
*/
public class ReactorShimTest {

/**
* Test to validate the {@link ReactorShim#windowTimeout(Flux, int, Duration)} facade honor backpressure.
* <p>
* Today the ReactorShim (type local to Event Hubs SDK) is always guaranteed to use Reactor 3.4.19 or above, which
* has backpressure aware windowTimeout. In the future, if the ReactorShim moves to azure-core, and if we decide to
* have a CI pipeline for the lower Reactor version (below 3.4.19), then this test needs to be adjusted to execute
* conditionally (i.e. run only for Reactor version >= 3.4.19 setup).
* <p>
* This is the replica, with additional asserts, of windowTimeout test that Azure Event Hubs crew contributed to
* Reactor while developing the backpressure aware windowTimeout operator, which got integrated into the Reactor
* test suite. The test is not using a virtual Timer scheduler (May have to check with the Reactor team to
* understand if it is safe to do so for this time-involved operator, and not invalidate any cases).
*/
@Test
public void windowTimeoutShouldHonorBackpressure() throws InterruptedException {
// -- The Event Producer
// The producer emitting requested events to downstream but with a delay of 250ms between each emission.
//
final int eventProduceDelayInMillis = 250;
Flux<String> producer = Flux.<String>create(sink -> {
sink.onRequest(request -> {
if (request != Long.MAX_VALUE) {
LongStream.range(0, request)
.mapToObj(String::valueOf)
.forEach(message -> {
try {
TimeUnit.MILLISECONDS.sleep(eventProduceDelayInMillis);
} catch (InterruptedException e) {
// Expected if thread was in sleep while disposing the subscription.
}
sink.next(message);
});
} else {
sink.error(new RuntimeException("No_Backpressure unsupported"));
}
});
}).subscribeOn(Schedulers.boundedElastic());

// -- The Event Consumer
// The consumer using windowTimeout that batches maximum 10 events with a max wait time of 1 second.
// Given the Event producer produces at most 4 events per second (due to 250 ms delay between events),
// the consumer should receive 3-4 events.
//
final ConcurrentLinkedQueue<Integer> batchSizes = new ConcurrentLinkedQueue<>();
final int maxWindowSize = 10;
final int eventConsumeDelayInMillis = 0;
final Scheduler scheduler = Schedulers.newBoundedElastic(10, 10000, "queued-tasks");
final AtomicReference<Throwable> errorReference = new AtomicReference<>(null);
final Semaphore isCompleted = new Semaphore(1);
isCompleted.acquire();

Disposable subscription = ReactorShim.windowTimeout(producer, maxWindowSize, Duration.ofSeconds(1))
.concatMap(Flux::collectList, 0)
.publishOn(scheduler)
.subscribe(eventBatch -> {
batchSizes.add(eventBatch.size());
for (String event : eventBatch) {
try {
TimeUnit.MILLISECONDS.sleep(eventConsumeDelayInMillis);
} catch (InterruptedException e) {
// Expected if thread was in sleep while disposing the subscription.
}
}
}, error -> {
errorReference.set(error);
isCompleted.release();
}, () -> {
isCompleted.release();
});

final Duration durationToPublish = Duration.ofSeconds(20);
try {
final boolean acquired = isCompleted.tryAcquire(durationToPublish.toMillis(), TimeUnit.MILLISECONDS);
if (acquired) {
if (errorReference.get() != null) {
fail("'isCompleted' should have been false because the producer"
+ " should not be terminating, but terminated with an error:" + errorReference.get());
} else {
fail("'isCompleted' should have been false because the producer"
+ " should not be terminating, but terminated to completion.");
}
}

for (Integer batchSize : batchSizes) {
assertTrue(batchSize <= maxWindowSize, "Unexpected batch size " + batchSize);
}

} finally {
subscription.dispose();
}
}
}