Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
741fa23
Sample showing handling of terminal error from ServiceBusReceiverAsyn…
anuchandy Jun 29, 2022
6199e19
Only Compile module-info.java in intermediate compile (#29701)
alzimmermsft Jun 29, 2022
7b50559
Rest Proxy Changes (#29671)
g2vinay Jun 29, 2022
7da0a26
Fix protocol policy test (#29706)
samvaity Jun 29, 2022
c0e5b40
Sync eng/common directory with azure-sdk-tools for PR 3491 (#29707)
azure-sdk Jun 29, 2022
f2a0d9c
[Automation] Generate Fluent Lite from workloads#package-2021-12-01-p…
azure-sdk Jun 30, 2022
f8f6e58
Increment versions for workloads releases (#29712)
azure-sdk Jun 30, 2022
fd05bda
Using new windowTimeout operator with backpressure support
anuchandy Jun 30, 2022
2488496
[Automation] Generate Fluent Lite from digitaltwins#package-2022-05 (…
azure-sdk Jun 30, 2022
f1ec63a
Increment versions for digitaltwins releases (#29715)
azure-sdk Jun 30, 2022
d52e9d7
[CommunicationIdentifier] Added support for rawId ⟷ CommunicationIden…
iaulakh Jun 30, 2022
04ac4f9
Refactor move servicebus message state key to azure-core-amqp (#29309)
ZejiaJiang Jun 30, 2022
d3370da
Incorporating sample feedback for the merged pr https://github.com/Az…
anuchandy Jun 30, 2022
516ae16
azure-search-documents Documentation and Linting Cleanup (#29168)
alzimmermsft Jun 30, 2022
4ff3653
Adds support to allow usage of none-cloneable MAC providers (#29719)
FabianMeiswinkel Jun 30, 2022
8d85737
Added encryption option to perf tests (#29703)
rickle-msft Jun 30, 2022
5265f25
Setting azure-cosmos version for hotfix release 4.32.1 (#29724)
FabianMeiswinkel Jun 30, 2022
27cbb9a
Prepare Core Libraries for July 2022 Release (#29720)
alzimmermsft Jun 30, 2022
e00634b
Testing the dev feed requires the bearer token (#29723)
JimSuplizio Jun 30, 2022
dae8dc2
Using Flux.class directly for windowTimeout lookup, using azure core …
anuchandy Jun 30, 2022
054df3f
BlobOutputStream#close handling Close Exception (#29417)
ibrahimrabab Jun 30, 2022
e672b7a
Increment versions for core releases (#29729)
azure-sdk Jun 30, 2022
7b49c08
Update README.md (#29145)
alzimmermsft Jun 30, 2022
648212e
Use core reporter in communication-callingserver (#29735)
kasobol-msft Jun 30, 2022
c86099f
Updating storage-blob's README to include changes for BlobOutputStrea…
ibrahimrabab Jun 30, 2022
1e1bd76
Prepare identity release (#29736)
billwert Jun 30, 2022
61979cc
Increment versions for identity releases (#29739)
azure-sdk Jul 1, 2022
9e12673
Upgrade spring-appconfiguration related version after release (#29745)
hui1110 Jul 1, 2022
7b85a20
ACR troubleshooting guide (#29647)
Jul 1, 2022
605530c
Prepare azure-search-documents for July 2022 Beta Release (#29754)
alzimmermsft Jul 1, 2022
cf0c662
Increment versions for search releases (#29755)
azure-sdk Jul 1, 2022
8164cb9
[Perf] Add pipeline for Core (#29741)
mikeharder Jul 1, 2022
d72cdc6
azure-search-documents July 2022 Patch Release Merge Back (#29757)
alzimmermsft Jul 1, 2022
3651767
Fix SyncRestProxy Double Serializing Request (#29758)
alzimmermsft Jul 1, 2022
466cc84
Increment package versions for cosmos releases (#29731)
azure-sdk Jul 1, 2022
aae4dc2
Update BearerTokenAuthenticationPolicy implementations with sync work…
samvaity Jul 1, 2022
073b3e4
Improving reflection logic and moving it to ReactorShim
anuchandy Jul 1, 2022
74c1469
Replace inline reflection with ReactorShim
anuchandy Jul 1, 2022
26d970e
Add Heap Dump on OOME to Storage CI Memory Tests (#29759)
alzimmermsft Jul 2, 2022
054da5f
add new code owner (#29773)
Netyyyy Jul 4, 2022
3d4108b
Update version after spring-cloud-azure:4.3.0 released (#29744)
hui1110 Jul 4, 2022
7296efc
Update verify release set (#29760)
JimSuplizio Jul 4, 2022
923837d
update step name (#29787)
Netyyyy Jul 5, 2022
7b9b494
update sdk/spring/spring-reference.yml (#29790)
hui1110 Jul 5, 2022
9668200
Update Spring Boot and Spring Cloud versions for the compatibility te…
Netyyyy Jul 5, 2022
cd55137
Enable verify-samples to handle multiple service directories (#29792)
azure-sdk Jul 5, 2022
e593a4d
Fix Attestation Readme Issue #28411 (#29333)
v-hongli1 Jul 5, 2022
b1ccf65
Add support for BinaryData for Form Recognizer (#29761)
samvaity Jul 5, 2022
527ffcf
listCheckpoints and listOwnership implementation (#29590)
anushkasingh16 Jul 5, 2022
c0322cf
Update mgmt toc structure and modify the service readme (#29802)
azure-sdk Jul 5, 2022
75f48dd
Adding option to use CF in batch mode with continuations and migrate …
FabianMeiswinkel Jul 5, 2022
0288b3f
Sync upstream main
anuchandy Jul 5, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion eng/versioning/version_client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,8 @@ com.azure.resourcemanager:azure-resourcemanager-education;1.0.0-beta.1;1.0.0-bet
com.azure.resourcemanager:azure-resourcemanager-orbital;1.0.0-beta.1;1.0.0-beta.2
com.azure.tools:azure-sdk-archetype;1.0.0;1.2.0-beta.1
com.azure.tools:azure-sdk-build-tool;1.0.0-beta.1;1.0.0-beta.2

unreleased_com.azure:azure-core-amqp;2.6.0-beta.1
unreleased_com.azure:azure-core;1.30.0-beta.1

# Unreleased dependencies: Copy the entry from above, prepend "unreleased_" and remove the current
# version. Unreleased dependencies are only valid for dependency versions.
Expand Down
4 changes: 2 additions & 2 deletions sdk/eventhubs/azure-messaging-eventhubs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,12 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core</artifactId>
<version>1.29.1</version> <!-- {x-version-update;com.azure:azure-core;dependency} -->
<version>1.30.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
</dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-core-amqp</artifactId>
<version>2.5.2</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
<version>2.6.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
</dependency>

<!-- Test dependencies -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.azure.core.util.tracing.ProcessKind;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.PartitionProcessorException;
import com.azure.messaging.eventhubs.implementation.ReactorShim;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
Expand Down Expand Up @@ -244,8 +245,7 @@ void startPartitionPump(PartitionOwnership claimedOwnership, Checkpoint checkpoi
});

if (maxWaitTime != null) {
partitionEventFlux = receiver
.windowTimeout(maxBatchSize, maxWaitTime);
partitionEventFlux = ReactorShim.windowTimeout(receiver, maxBatchSize, maxWaitTime);
} else {
partitionEventFlux = receiver
.window(maxBatchSize);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.implementation;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.time.Duration;

/**
* A shim that proxies Reactor operator calls, for example, if the loaded Reactor has an optimized variant
* of a standard operator, then shim uses it else fallback to the standard variant, if there are breaking
* changes in operators among Reactor versions that SDK supports, then shim may expose a unified API for
* those operators.
*/
public final class ReactorShim {
private static final ClientLogger LOGGER = new ClientLogger(ReactorShim.class);

/* Reactor Operator names */
private static final String WINDOW_TIMEOUT_OPERATOR = "windowTimeout";
/* Reactor Operator handles */
private static final MethodHandle BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE;

static {
BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE = lookupBackpressureWindowTimeoutOperator();
}

/**
* Split the {@code source} {@link Flux} sequence into multiple {@link Flux} windows containing
* {@code maxSize} elements (or less for the final window) and starting from the first item.
* Each {@link Flux} window will onComplete once it contains {@code maxSize} elements
* OR it has been open for the given {@link Duration} (as measured on the {@link Schedulers#parallel() parallel}
* Scheduler).
*
* <p>
* If the loaded Reactor library has a backpressure-aware window-timeout operator then it will be used,
* which caps requests to the source by {@code maxSize} (i.e. prefetch), otherwise, the regular variant
* of window-timeout operator requesting unbounded demand will be used.
*
* @param maxSize the maximum number of items to emit in the window before closing it
* @param maxTime the maximum {@link Duration} since the window was opened before closing it
*
* @param <T> the element type of the source {@link Flux}.
* @return a {@link Flux} of {@link Flux} windows based on element count and duration.
*/
public static <T> Flux<Flux<T>> windowTimeout(Flux<T> source, int maxSize, Duration maxTime) {
if (BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE == null) {
// optimized (backpressure) aware windowTimeout operator not available use standard variant.
return source.windowTimeout(maxSize, maxTime);
}

try {
return ((Flux<Flux<T>>) (BACKPRESSURE_WINDOW_TIMEOUT_OPERATOR_HANDLE.invoke(source, maxSize, maxTime, true)));
} catch (Throwable err) {
// 'java.lang.invoke' throws Throwable. Given 'Error' category represents a serious
// abnormal thread state throw it immediately else throw via standard azure-core Logger.
if (err instanceof Error) {
throw (Error) err;
} else if (err instanceof RuntimeException) {
throw LOGGER.logExceptionAsError((RuntimeException) err);
} else {
throw LOGGER.logExceptionAsError(new RuntimeException(err));
}
}
}

/**
* Try to obtain {@link MethodHandle} for backpressure aware windowTimeout Reactor operator.
*
* @return if the backpressure aware windowTimeout Reactor operator is available then return
* operator {@link MethodHandle} else null.
*/
private static MethodHandle lookupBackpressureWindowTimeoutOperator() {
try {
return MethodHandles.publicLookup().findVirtual(Flux.class, WINDOW_TIMEOUT_OPERATOR,
MethodType.methodType(Flux.class, int.class, Duration.class, boolean.class));
} catch (IllegalAccessException | NoSuchMethodException err) {
LOGGER.verbose("Failed to retrieve MethodHandle for backpressure aware windowTimeout Reactor operator.", err);
}
return null;
}
}