diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index d5430767cac9..01b81fcfcdbc 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -10,6 +10,9 @@ ### Bugs Fixed +- Introducing ReactorShim to proxy certain reactive operations to appropriate Reactor operators, these are the operations for which recent Reactor versions have more optimized operators compared to an older version, or same operators with breaking change across Reactor versions +- When available, using the backpressure aware windowTimeout operator through ReactorShim. ([23950](https://github.com/Azure/azure-sdk-for-java/issues/23950)) + ### Other Changes ## 5.12.2 (2022-07-07) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index 2ad47952abb3..961ac1c59936 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -10,6 +10,7 @@ import com.azure.core.util.tracing.ProcessKind; import com.azure.messaging.eventhubs.implementation.PartitionProcessor; import com.azure.messaging.eventhubs.implementation.PartitionProcessorException; +import com.azure.messaging.eventhubs.implementation.ReactorShim; import com.azure.messaging.eventhubs.models.Checkpoint; import com.azure.messaging.eventhubs.models.CloseContext; import com.azure.messaging.eventhubs.models.CloseReason; @@ -244,8 +245,7 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi }); if (maxWaitTime != null) { - partitionEventFlux = receiver - .windowTimeout(maxBatchSize, maxWaitTime); + partitionEventFlux = ReactorShim.windowTimeout(receiver, maxBatchSize, maxWaitTime); } else { partitionEventFlux = receiver .window(maxBatchSize); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorShim.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorShim.java new file mode 100644 index 000000000000..4c068126ca26 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorShim.java @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.implementation; + +import com.azure.core.util.logging.ClientLogger; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.time.Duration; + +/** + * A shim that proxies Reactor operator calls, for example, if the loaded Reactor has an optimized variant + * of a standard operator, then shim uses it else fallback to the standard variant, if there are breaking + * changes in operators among Reactor versions that SDK supports, then shim may expose a unified API for + * those operators. + */ +public final class ReactorShim { + private static final ClientLogger LOGGER = new ClientLogger(ReactorShim.class); + + /* Reactor Operator names */ + private static final String WINDOW_TIMEOUT_OPERATOR = "windowTimeout"; + /* Reactor Operator handles */ + private static final MethodHandle BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE; + + static { + BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE = lookupBackpressureWindowTimeoutOperator(); + } + + /** + * Split the {@code source} {@link Flux} sequence into multiple {@link Flux} windows containing + * {@code maxSize} elements (or less for the final window) and starting from the first item. + * Each {@link Flux} window will onComplete once it contains {@code maxSize} elements + * OR it has been open for the given {@link Duration} (as measured on the {@link Schedulers#parallel() parallel} + * Scheduler). + * + *

+ * If the loaded Reactor library has a backpressure-aware window-timeout operator then it will be used, + * which caps requests to the source by {@code maxSize} (i.e. prefetch), otherwise, the regular variant + * of window-timeout operator requesting unbounded demand will be used. + * + * @param maxSize the maximum number of items to emit in the window before closing it + * @param maxTime the maximum {@link Duration} since the window was opened before closing it + * + * @param the element type of the source {@link Flux}. + * @return a {@link Flux} of {@link Flux} windows based on element count and duration. + */ + public static Flux> windowTimeout(Flux source, int maxSize, Duration maxTime) { + if (BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE == null) { + // optimized (backpressure) aware windowTimeout operator not available use standard variant. + return source.windowTimeout(maxSize, maxTime); + } + + try { + return ((Flux>) (BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE.invoke(source, maxSize, maxTime, true))); + } catch (Throwable err) { + // 'java.lang.invoke' throws Throwable. Given 'Error' category represents a serious + // abnormal thread state throw it immediately else throw via standard azure-core Logger. + if (err instanceof Error) { + throw (Error) err; + } else if (err instanceof RuntimeException) { + throw LOGGER.logExceptionAsError((RuntimeException) err); + } else { + throw LOGGER.logExceptionAsError(new RuntimeException(err)); + } + } + } + + /** + * Try to obtain {@link MethodHandle} for backpressure aware windowTimeout Reactor operator. + * + * @return if the backpressure aware windowTimeout Reactor operator is available then return + * operator {@link MethodHandle} else null. + */ + private static MethodHandle lookupBackpressureWindowTimeoutOperator() { + try { + return MethodHandles.publicLookup().findVirtual(Flux.class, WINDOW_TIMEOUT_OPERATOR, + MethodType.methodType(Flux.class, int.class, Duration.class, boolean.class)); + } catch (IllegalAccessException | NoSuchMethodException err) { + LOGGER.verbose("Failed to retrieve MethodHandle for backpressure aware windowTimeout Reactor operator.", err); + } + return null; + } +} diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorShimTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorShimTest.java new file mode 100644 index 000000000000..316bac5d8644 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorShimTest.java @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.implementation; + +import org.junit.jupiter.api.Test; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * Tests for {@link ReactorShim}. + */ +public class ReactorShimTest { + + /** + * Test to validate the {@link ReactorShim#windowTimeout(Flux, int, Duration)} facade honor backpressure. + *

+ * Today the ReactorShim (type local to Event Hubs SDK) is always guaranteed to use Reactor 3.4.19 or above, which + * has backpressure aware windowTimeout. In the future, if the ReactorShim moves to azure-core, and if we decide to + * have a CI pipeline for the lower Reactor version (below 3.4.19), then this test needs to be adjusted to execute + * conditionally (i.e. run only for Reactor version >= 3.4.19 setup). + *

+ * This is the replica, with additional asserts, of windowTimeout test that Azure Event Hubs crew contributed to + * Reactor while developing the backpressure aware windowTimeout operator, which got integrated into the Reactor + * test suite. The test is not using a virtual Timer scheduler (May have to check with the Reactor team to + * understand if it is safe to do so for this time-involved operator, and not invalidate any cases). + */ + @Test + public void windowTimeoutShouldHonorBackpressure() throws InterruptedException { + // -- The Event Producer + // The producer emitting requested events to downstream but with a delay of 250ms between each emission. + // + final int eventProduceDelayInMillis = 250; + Flux producer = Flux.create(sink -> { + sink.onRequest(request -> { + if (request != Long.MAX_VALUE) { + LongStream.range(0, request) + .mapToObj(String::valueOf) + .forEach(message -> { + try { + TimeUnit.MILLISECONDS.sleep(eventProduceDelayInMillis); + } catch (InterruptedException e) { + // Expected if thread was in sleep while disposing the subscription. + } + sink.next(message); + }); + } else { + sink.error(new RuntimeException("No_Backpressure unsupported")); + } + }); + }).subscribeOn(Schedulers.boundedElastic()); + + // -- The Event Consumer + // The consumer using windowTimeout that batches maximum 10 events with a max wait time of 1 second. + // Given the Event producer produces at most 4 events per second (due to 250 ms delay between events), + // the consumer should receive 3-4 events. + // + final ConcurrentLinkedQueue batchSizes = new ConcurrentLinkedQueue<>(); + final int maxWindowSize = 10; + final int eventConsumeDelayInMillis = 0; + final Scheduler scheduler = Schedulers.newBoundedElastic(10, 10000, "queued-tasks"); + final AtomicReference errorReference = new AtomicReference<>(null); + final Semaphore isCompleted = new Semaphore(1); + isCompleted.acquire(); + + Disposable subscription = ReactorShim.windowTimeout(producer, maxWindowSize, Duration.ofSeconds(1)) + .concatMap(Flux::collectList, 0) + .publishOn(scheduler) + .subscribe(eventBatch -> { + batchSizes.add(eventBatch.size()); + for (String event : eventBatch) { + try { + TimeUnit.MILLISECONDS.sleep(eventConsumeDelayInMillis); + } catch (InterruptedException e) { + // Expected if thread was in sleep while disposing the subscription. + } + } + }, error -> { + errorReference.set(error); + isCompleted.release(); + }, () -> { + isCompleted.release(); + }); + + final Duration durationToPublish = Duration.ofSeconds(20); + try { + final boolean acquired = isCompleted.tryAcquire(durationToPublish.toMillis(), TimeUnit.MILLISECONDS); + if (acquired) { + if (errorReference.get() != null) { + fail("'isCompleted' should have been false because the producer" + + " should not be terminating, but terminated with an error:" + errorReference.get()); + } else { + fail("'isCompleted' should have been false because the producer" + + " should not be terminating, but terminated to completion."); + } + } + + for (Integer batchSize : batchSizes) { + assertTrue(batchSize <= maxWindowSize, "Unexpected batch size " + batchSize); + } + + } finally { + subscription.dispose(); + } + } +}