From fd05bda31c0de3fa0ed540709fbc814222ef964d Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Wed, 29 Jun 2022 19:19:45 -0700 Subject: [PATCH 1/6] Using new windowTimeout operator with backpressure support --- .../eventhubs/PartitionPumpManager.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index 2ad47952abb3..5fdfbdf03cf8 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -28,6 +28,8 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; import java.time.Duration; import java.util.List; import java.util.Locale; @@ -65,6 +67,24 @@ class PartitionPumpManager { private static final int MAXIMUM_QUEUE_SIZE = 10000; private static final ClientLogger LOGGER = new ClientLogger(PartitionPumpManager.class); + private static final boolean HAS_BACKPRESSURE_WINDOW_TIMEOUT; + static { + final Class fluxClazz = reactor.core.publisher.Flux.class; + boolean hasBackPressureWindowTimeout = true; + try { + // The backpressure aware window-timeout operator is available from reactor-core v3.4.19, but + // the user app, or its transitive dependencies may downgrade the reactor-core, so fall back + // to the window-timeout API without back pressure to avoid runtime NoSuchMethodException. + MethodHandles.publicLookup().findVirtual(fluxClazz, "windowTimeout", + MethodType.methodType(fluxClazz, int.class, Duration.class, boolean.class)); + } catch (IllegalAccessException | NoSuchMethodException error) { + hasBackPressureWindowTimeout = false; + LOGGER.verbose("Failed to locate backpressure aware variant of windowTimeout, " + + "falling back to non-backpressure variant.", error); + } + HAS_BACKPRESSURE_WINDOW_TIMEOUT = hasBackPressureWindowTimeout; + } + //TODO (conniey): Add a configurable scheduler size, at the moment we are creating a new elastic scheduler // for each partition pump that will have at most number of processors * 4. private final int schedulerSize = Runtime.getRuntime().availableProcessors() * 4; @@ -244,8 +264,13 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi }); if (maxWaitTime != null) { - partitionEventFlux = receiver - .windowTimeout(maxBatchSize, maxWaitTime); + if (HAS_BACKPRESSURE_WINDOW_TIMEOUT) { + partitionEventFlux = receiver + .windowTimeout(maxBatchSize, maxWaitTime, true); + } else { + partitionEventFlux = receiver + .windowTimeout(maxBatchSize, maxWaitTime); + } } else { partitionEventFlux = receiver .window(maxBatchSize); From dae8dc226adad70ebe8800624e82bbb665b7f668 Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Thu, 30 Jun 2022 12:10:58 -0700 Subject: [PATCH 2/6] Using Flux.class directly for windowTimeout lookup, using azure core libs defined to use v4.3.19 reactor-core --- eng/versioning/version_client.txt | 3 ++- sdk/eventhubs/azure-messaging-eventhubs/pom.xml | 4 ++-- .../com/azure/messaging/eventhubs/PartitionPumpManager.java | 5 ++--- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/eng/versioning/version_client.txt b/eng/versioning/version_client.txt index a8ebbb63e1b0..50dfb6604aa4 100644 --- a/eng/versioning/version_client.txt +++ b/eng/versioning/version_client.txt @@ -354,7 +354,8 @@ com.azure.resourcemanager:azure-resourcemanager-education;1.0.0-beta.1;1.0.0-bet com.azure.resourcemanager:azure-resourcemanager-orbital;1.0.0-beta.1;1.0.0-beta.2 com.azure.tools:azure-sdk-archetype;1.0.0;1.2.0-beta.1 com.azure.tools:azure-sdk-build-tool;1.0.0-beta.1;1.0.0-beta.2 - +unreleased_com.azure:azure-core-amqp;2.6.0-beta.1 +unreleased_com.azure:azure-core;1.30.0-beta.1 # Unreleased dependencies: Copy the entry from above, prepend "unreleased_" and remove the current # version. Unreleased dependencies are only valid for dependency versions. diff --git a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml index 09ff478cefbb..f2fec8e876e3 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/pom.xml +++ b/sdk/eventhubs/azure-messaging-eventhubs/pom.xml @@ -37,12 +37,12 @@ com.azure azure-core - 1.29.1 + 1.30.0-beta.1 com.azure azure-core-amqp - 2.5.2 + 2.6.0-beta.1 diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index 5fdfbdf03cf8..8a7d2be91fb9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -69,14 +69,13 @@ class PartitionPumpManager { private static final boolean HAS_BACKPRESSURE_WINDOW_TIMEOUT; static { - final Class fluxClazz = reactor.core.publisher.Flux.class; boolean hasBackPressureWindowTimeout = true; try { // The backpressure aware window-timeout operator is available from reactor-core v3.4.19, but // the user app, or its transitive dependencies may downgrade the reactor-core, so fall back // to the window-timeout API without back pressure to avoid runtime NoSuchMethodException. - MethodHandles.publicLookup().findVirtual(fluxClazz, "windowTimeout", - MethodType.methodType(fluxClazz, int.class, Duration.class, boolean.class)); + MethodHandles.publicLookup().findVirtual(Flux.class, "windowTimeout", + MethodType.methodType(Flux.class, int.class, Duration.class, boolean.class)); } catch (IllegalAccessException | NoSuchMethodException error) { hasBackPressureWindowTimeout = false; LOGGER.verbose("Failed to locate backpressure aware variant of windowTimeout, " From 073b3e4749ec0837e1af070a2a238e09f543b442 Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Fri, 1 Jul 2022 14:38:38 -0700 Subject: [PATCH 3/6] Improving reflection logic and moving it to ReactorShim --- .../eventhubs/implementation/ReactorShim.java | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorShim.java diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorShim.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorShim.java new file mode 100644 index 000000000000..4c068126ca26 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorShim.java @@ -0,0 +1,87 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.implementation; + +import com.azure.core.util.logging.ClientLogger; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import java.lang.invoke.MethodHandle; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodType; +import java.time.Duration; + +/** + * A shim that proxies Reactor operator calls, for example, if the loaded Reactor has an optimized variant + * of a standard operator, then shim uses it else fallback to the standard variant, if there are breaking + * changes in operators among Reactor versions that SDK supports, then shim may expose a unified API for + * those operators. + */ +public final class ReactorShim { + private static final ClientLogger LOGGER = new ClientLogger(ReactorShim.class); + + /* Reactor Operator names */ + private static final String WINDOW_TIMEOUT_OPERATOR = "windowTimeout"; + /* Reactor Operator handles */ + private static final MethodHandle BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE; + + static { + BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE = lookupBackpressureWindowTimeoutOperator(); + } + + /** + * Split the {@code source} {@link Flux} sequence into multiple {@link Flux} windows containing + * {@code maxSize} elements (or less for the final window) and starting from the first item. + * Each {@link Flux} window will onComplete once it contains {@code maxSize} elements + * OR it has been open for the given {@link Duration} (as measured on the {@link Schedulers#parallel() parallel} + * Scheduler). + * + *

+ * If the loaded Reactor library has a backpressure-aware window-timeout operator then it will be used, + * which caps requests to the source by {@code maxSize} (i.e. prefetch), otherwise, the regular variant + * of window-timeout operator requesting unbounded demand will be used. + * + * @param maxSize the maximum number of items to emit in the window before closing it + * @param maxTime the maximum {@link Duration} since the window was opened before closing it + * + * @param the element type of the source {@link Flux}. + * @return a {@link Flux} of {@link Flux} windows based on element count and duration. + */ + public static Flux> windowTimeout(Flux source, int maxSize, Duration maxTime) { + if (BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE == null) { + // optimized (backpressure) aware windowTimeout operator not available use standard variant. + return source.windowTimeout(maxSize, maxTime); + } + + try { + return ((Flux>) (BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE.invoke(source, maxSize, maxTime, true))); + } catch (Throwable err) { + // 'java.lang.invoke' throws Throwable. Given 'Error' category represents a serious + // abnormal thread state throw it immediately else throw via standard azure-core Logger. + if (err instanceof Error) { + throw (Error) err; + } else if (err instanceof RuntimeException) { + throw LOGGER.logExceptionAsError((RuntimeException) err); + } else { + throw LOGGER.logExceptionAsError(new RuntimeException(err)); + } + } + } + + /** + * Try to obtain {@link MethodHandle} for backpressure aware windowTimeout Reactor operator. + * + * @return if the backpressure aware windowTimeout Reactor operator is available then return + * operator {@link MethodHandle} else null. + */ + private static MethodHandle lookupBackpressureWindowTimeoutOperator() { + try { + return MethodHandles.publicLookup().findVirtual(Flux.class, WINDOW_TIMEOUT_OPERATOR, + MethodType.methodType(Flux.class, int.class, Duration.class, boolean.class)); + } catch (IllegalAccessException | NoSuchMethodException err) { + LOGGER.verbose("Failed to retrieve MethodHandle for backpressure aware windowTimeout Reactor operator.", err); + } + return null; + } +} From 74c14696799006a4f6537dd8464d29fbde6c77dd Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Fri, 1 Jul 2022 14:45:25 -0700 Subject: [PATCH 4/6] Replace inline reflection with ReactorShim --- .../eventhubs/PartitionPumpManager.java | 28 ++----------------- 1 file changed, 2 insertions(+), 26 deletions(-) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java index 8a7d2be91fb9..961ac1c59936 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/PartitionPumpManager.java @@ -10,6 +10,7 @@ import com.azure.core.util.tracing.ProcessKind; import com.azure.messaging.eventhubs.implementation.PartitionProcessor; import com.azure.messaging.eventhubs.implementation.PartitionProcessorException; +import com.azure.messaging.eventhubs.implementation.ReactorShim; import com.azure.messaging.eventhubs.models.Checkpoint; import com.azure.messaging.eventhubs.models.CloseContext; import com.azure.messaging.eventhubs.models.CloseReason; @@ -28,8 +29,6 @@ import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; -import java.lang.invoke.MethodHandles; -import java.lang.invoke.MethodType; import java.time.Duration; import java.util.List; import java.util.Locale; @@ -67,23 +66,6 @@ class PartitionPumpManager { private static final int MAXIMUM_QUEUE_SIZE = 10000; private static final ClientLogger LOGGER = new ClientLogger(PartitionPumpManager.class); - private static final boolean HAS_BACKPRESSURE_WINDOW_TIMEOUT; - static { - boolean hasBackPressureWindowTimeout = true; - try { - // The backpressure aware window-timeout operator is available from reactor-core v3.4.19, but - // the user app, or its transitive dependencies may downgrade the reactor-core, so fall back - // to the window-timeout API without back pressure to avoid runtime NoSuchMethodException. - MethodHandles.publicLookup().findVirtual(Flux.class, "windowTimeout", - MethodType.methodType(Flux.class, int.class, Duration.class, boolean.class)); - } catch (IllegalAccessException | NoSuchMethodException error) { - hasBackPressureWindowTimeout = false; - LOGGER.verbose("Failed to locate backpressure aware variant of windowTimeout, " - + "falling back to non-backpressure variant.", error); - } - HAS_BACKPRESSURE_WINDOW_TIMEOUT = hasBackPressureWindowTimeout; - } - //TODO (conniey): Add a configurable scheduler size, at the moment we are creating a new elastic scheduler // for each partition pump that will have at most number of processors * 4. private final int schedulerSize = Runtime.getRuntime().availableProcessors() * 4; @@ -263,13 +245,7 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi }); if (maxWaitTime != null) { - if (HAS_BACKPRESSURE_WINDOW_TIMEOUT) { - partitionEventFlux = receiver - .windowTimeout(maxBatchSize, maxWaitTime, true); - } else { - partitionEventFlux = receiver - .windowTimeout(maxBatchSize, maxWaitTime); - } + partitionEventFlux = ReactorShim.windowTimeout(receiver, maxBatchSize, maxWaitTime); } else { partitionEventFlux = receiver .window(maxBatchSize); From 49898d65a6e065a99654b01d7d461977ab9c5dac Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Tue, 5 Jul 2022 17:08:16 -0700 Subject: [PATCH 5/6] Adding ReactorShim test for backpressure aware windowTimeout. --- .../implementation/ReactorShimTest.java | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorShimTest.java diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorShimTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorShimTest.java new file mode 100644 index 000000000000..e82f83d42482 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorShimTest.java @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.implementation; + +import org.junit.jupiter.api.Test; +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.LongStream; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link ReactorShim}. + */ +public class ReactorShimTest { + + /** + * Test to validate the {@link ReactorShim#windowTimeout(Flux, int, Duration)} facade honor backpressure. + *

+ * Today the ReactorShim (type local to Event Hubs SDK) is always guaranteed to use Reactor 3.4.19 or above, + * which has backpressure aware windowTimeout. In the future, if the ReactorShim moves to azure-core, and if + * we decide to have a CI pipeline for the lower Reactor version (below 3.4.19), then this test needs to be + * adjusted to execute conditionally (i.e. run only for Reactor version >= 3.4.19 setup). + *

+ * This is the replica, with additional asserts, of windowTimeout test that Azure Event Hubs crew contributed + * to Reactor while developing the backpressure aware windowTimeout operator, which got integrated into + * the Reactor test suite. The test is not using a virtual Timer scheduler (May have to check with the Reactor + * team to understand if it is safe to do so for this time-involved operator, and not invalidate any cases). + */ + @Test + public void windowTimeoutShouldHonorBackpressure() throws InterruptedException { + // -- The Event Producer + // The producer emitting requested events to downstream but with a delay of 250ms between each emission. + // + final int eventProduceDelayInMillis = 250; + Flux producer = Flux.create(sink -> { + sink.onRequest(request -> { + if (request != Long.MAX_VALUE) { + LongStream.range(0, request) + .mapToObj(String::valueOf) + .forEach(message -> { + try { + TimeUnit.MILLISECONDS.sleep(eventProduceDelayInMillis); + } catch (InterruptedException e) { + // Expected if thread was in sleep while disposing the subscription. + } + sink.next(message); + }); + } else { + sink.error(new RuntimeException("No_Backpressure unsupported")); + } + }); + }).subscribeOn(Schedulers.boundedElastic()); + + // -- The Event Consumer + // The consumer using windowTimeout that batches maximum 10 events with a max wait time of 1 second. + // Given the Event producer produces at most 4 events per second (due to 250 ms delay between events), + // the consumer should receive 3-4 events. + // + final List batchSizes = new ArrayList<>(300); + final int maxWindowSize = 10; + final int eventConsumeDelayInMillis = 0; + final Scheduler scheduler = Schedulers.newBoundedElastic(10, 10000, "queued-tasks"); + final AtomicReference errorReference = new AtomicReference<>(null); + final Semaphore isCompleted = new Semaphore(1); + isCompleted.acquire(); + + Disposable subscription = ReactorShim.windowTimeout(producer, maxWindowSize, Duration.ofSeconds(1)) + .concatMap(Flux::collectList, 0) + .publishOn(scheduler) + .subscribe(eventBatch -> { + batchSizes.add(eventBatch.size()); + for (String event : eventBatch) { + try { + TimeUnit.MILLISECONDS.sleep(eventConsumeDelayInMillis); + } catch (InterruptedException e) { + // Expected if thread was in sleep while disposing the subscription. + } + } + }, error -> { + errorReference.set(error); + isCompleted.release(); + }, () -> { + isCompleted.release(); + }); + + final Duration durationToPublish = Duration.ofSeconds(20); + try { + final boolean acquired = isCompleted.tryAcquire(durationToPublish.toMillis(), TimeUnit.MILLISECONDS); + if (acquired) { + if (errorReference.get() != null) { + assertFalse(true, "'isCompleted' should have been false because the producer" + + " should not be terminating, but terminated with an error:" + errorReference.get()); + } else { + assertFalse(true, "'isCompleted' should have been false because the producer" + + " should not be terminating, but terminated to completion."); + } + } + for (Integer batchSize : batchSizes) { + assertTrue(batchSize <= maxWindowSize, "Unexpected batch size " + batchSize); + } + + } finally { + subscription.dispose(); + } + } +} From 3c2514e23d932f3ad445dd6142717379e62a07ec Mon Sep 17 00:00:00 2001 From: Anu Thomas Chandy Date: Tue, 5 Jul 2022 18:03:55 -0700 Subject: [PATCH 6/6] Changelog update for ReactorShim --- sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md index 139f416f19e0..73a5f57b8d2e 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md +++ b/sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md @@ -8,6 +8,9 @@ ### Bugs Fixed +- Introducing ReactorShim to proxy certain reactive operations to appropriate Reactor operators, these are the operations for which recent Reactor versions have more optimized operators compared to an older version, or same operators with breaking change across Reactor versions +- When available, using the backpressure aware windowTimeout operator through ReactorShim. ([23950](https://github.com/Azure/azure-sdk-for-java/issues/23950)) + ### Other Changes ## 5.12.1 (2022-06-10)