diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java index 2ee596fd0390..68653566b24e 100644 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/AmqpSession.java @@ -32,10 +32,10 @@ public interface AmqpSession extends EndpointStateNotifier, Closeable { * @param linkName Name of the link. * @param entityPath The entity path this link connects to when producing events. * @param timeout Timeout required for creating and opening AMQP link. - * @param retry The retry policy to use when sending messages. + * @param retryPolicy The retry policy to use when sending messages. * @return A newly created AMQP link. */ - Mono createProducer(String linkName, String entityPath, Duration timeout, Retry retry); + Mono createProducer(String linkName, String entityPath, Duration timeout, RetryPolicy retryPolicy); /** * Creates a new AMQP link that consumes events from the message broker. @@ -43,10 +43,10 @@ public interface AmqpSession extends EndpointStateNotifier, Closeable { * @param linkName Name of the link. * @param entityPath The entity path this link connects to, so that it may read events from the message broker. * @param timeout Timeout required for creating and opening an AMQP link. - * @param retry The retry policy to use when consuming messages. + * @param retryPolicy The retry policy to use when consuming messages. * @return A newly created AMQP link. */ - Mono createConsumer(String linkName, String entityPath, Duration timeout, Retry retry); + Mono createConsumer(String linkName, String entityPath, Duration timeout, RetryPolicy retryPolicy); /** * Removes an {@link AmqpLink} with the given {@code linkName}. diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/ExponentialRetry.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/ExponentialRetry.java deleted file mode 100644 index 87e136690bc0..000000000000 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/ExponentialRetry.java +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp; - -import java.time.Duration; -import java.util.Objects; - -/** - * A policy to govern retrying of messaging operations in which the delay between retries will grow in an exponential - * manner, allowing more time to recover as the number of retries increases. - */ -public final class ExponentialRetry extends Retry { - private static final Duration TIMER_TOLERANCE = Duration.ofSeconds(1); - - private final Duration minBackoff; - private final Duration maxBackoff; - private final double retryFactor; - - /** - * Creates a new instance with a minimum and maximum retry period in addition to maximum number of retry attempts. - * - * @param minBackoff The minimum time period permissible for backing off between retries. - * @param maxBackoff The maximum time period permissible for backing off between retries. - * @param maxRetryCount The maximum number of retries allowed. - * @throws NullPointerException if {@code minBackoff} or {@code maxBackoff} is {@code null}. - */ - public ExponentialRetry(Duration minBackoff, Duration maxBackoff, int maxRetryCount) { - super(maxRetryCount); - Objects.requireNonNull(minBackoff); - Objects.requireNonNull(maxBackoff); - - this.minBackoff = minBackoff; - this.maxBackoff = maxBackoff; - - this.retryFactor = computeRetryFactor(); - } - - /** - * {@inheritDoc} - */ - @Override - protected Duration calculateNextRetryInterval(final Exception lastException, - final Duration remainingTime, - final int baseWaitSeconds, - final int retryCount) { - final double nextRetryInterval = Math.pow(retryFactor, (double) retryCount); - final long nextRetryIntervalSeconds = (long) nextRetryInterval; - final long nextRetryIntervalNano = (long) ((nextRetryInterval - (double) nextRetryIntervalSeconds) * 1000000000); - - if (remainingTime.getSeconds() < Math.max(nextRetryInterval, TIMER_TOLERANCE.getSeconds())) { - return null; - } - - final Duration retryAfter = minBackoff.plus(Duration.ofSeconds(nextRetryIntervalSeconds, nextRetryIntervalNano)); - return retryAfter.plus(Duration.ofSeconds(baseWaitSeconds)); - } - - private double computeRetryFactor() { - final long deltaBackoff = maxBackoff.minus(minBackoff).getSeconds(); - if (deltaBackoff <= 0 || super.getMaxRetryCount() <= 0) { - return 0; - } - return Math.log(deltaBackoff) / Math.log(super.getMaxRetryCount()); - } - - @Override - public int hashCode() { - return Objects.hash(maxBackoff, minBackoff, getMaxRetryCount(), getRetryCount()); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - - if (!(obj instanceof ExponentialRetry)) { - return false; - } - - ExponentialRetry other = (ExponentialRetry) obj; - - return this.maxBackoff.equals(other.maxBackoff) - && this.minBackoff.equals(other.minBackoff) - && this.getMaxRetryCount() == other.getMaxRetryCount() - && this.getRetryCount() == other.getRetryCount(); - } - - /** - * Creates a clone of this instance. - * - * The {@code minBackoff}, {@code maxBackoff}, and {@code maxRetryCount} are not cloned, but these objects are - * immutable and not subject to change. - * - * @return A clone of the {@link ExponentialRetry} instance. - */ - @SuppressWarnings("CloneDoesntCallSuperClone") - @Override - public Object clone() { - return new ExponentialRetry(minBackoff, maxBackoff, getMaxRetryCount()); - } -} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/ExponentialRetryPolicy.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/ExponentialRetryPolicy.java new file mode 100644 index 000000000000..bfbdcfabed5b --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/ExponentialRetryPolicy.java @@ -0,0 +1,93 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +/** + * A policy to govern retrying of messaging operations in which the delay between retries will grow in an exponential + * manner, allowing more time to recover as the number of retries increases. + */ +public final class ExponentialRetryPolicy extends RetryPolicy { + private final double retryFactor; + + /** + * Creates a new instance with a minimum and maximum retry period in addition to maximum number of retry attempts. + * + * @param retryOptions The options to apply to this retry policy. + * @throws NullPointerException if {@code retryOptions} is {@code null}. + */ + public ExponentialRetryPolicy(RetryOptions retryOptions) { + super(retryOptions); + + this.retryFactor = computeRetryFactor(); + } + + /** + * Calculates the retry delay using exponential backoff. + * + * @param retryCount The number of attempts that have been made, including the initial attempt before any + * retries. + * @param baseDelay The delay to use for the basis of the exponential backoff. + * @param baseJitter The duration to use for the basis of the random jitter value. + * @param random The random number generator used to calculate the jitter. + * @return The duration to delay before retrying a request. + */ + @Override + protected Duration calculateRetryDelay(int retryCount, Duration baseDelay, Duration baseJitter, + ThreadLocalRandom random) { + final double jitterSeconds = random.nextDouble() * baseJitter.getSeconds(); + final double nextRetrySeconds = Math.pow(retryFactor, (double) retryCount); + final Double nextRetryNanos = (jitterSeconds + nextRetrySeconds) * NANOS_PER_SECOND; + + return baseDelay.plus(Duration.ofNanos(nextRetryNanos.longValue())); + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return super.hashCode(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + return obj instanceof ExponentialRetryPolicy + && super.equals(obj); + } + + /** + * Creates a clone of this instance. + * + * @return A clone of the {@link ExponentialRetryPolicy} instance. + */ + @Override + public RetryPolicy clone() { + final RetryOptions cloned = getRetryOptions().clone(); + return new ExponentialRetryPolicy(cloned); + } + + private double computeRetryFactor() { + final RetryOptions options = getRetryOptions(); + final Duration maxBackoff = options.maxDelay(); + final Duration minBackoff = options.delay(); + final int maximumRetries = options.maxRetries(); + final long deltaBackoff = maxBackoff.minus(minBackoff).getSeconds(); + + if (deltaBackoff <= 0 || maximumRetries <= 0) { + return 0; + } + + return Math.log(deltaBackoff) / Math.log(maximumRetries); + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/FixedRetryPolicy.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/FixedRetryPolicy.java new file mode 100644 index 000000000000..56e84b7ea43d --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/FixedRetryPolicy.java @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +/** + * A policy to govern retrying of messaging operations in which the base delay between retries remains the same. + */ +public final class FixedRetryPolicy extends RetryPolicy { + /** + * Creates an instance with the given retry options. + * + * @param retryOptions The options to set on this retry policy. + */ + public FixedRetryPolicy(RetryOptions retryOptions) { + super(retryOptions); + } + + /** + * Calculates the delay for a fixed backoff. + * + * @param retryCount The number of attempts that have been made, including the initial attempt before any + * retries. + * @param baseDelay The delay to use for the fixed backoff. + * @param baseJitter The duration to use for the basis of the random jitter value. + * @param random The random number generator used to calculate the jitter. + * @return The duration to delay before retrying a request. + */ + @Override + protected Duration calculateRetryDelay(int retryCount, Duration baseDelay, Duration baseJitter, ThreadLocalRandom random) { + final Double jitterNanos = random.nextDouble() * baseJitter.getSeconds() * RetryPolicy.NANOS_PER_SECOND; + final Duration jitter = Duration.ofNanos(jitterNanos.longValue()); + + return baseDelay.plus(jitter); + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return super.hashCode(); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + return obj instanceof FixedRetryPolicy + && super.equals(obj); + } + + /** + * Creates a clone of this instance. + * + * @return A clone of the {@link FixedRetryPolicy} instance. + */ + @Override + public RetryPolicy clone() { + final RetryOptions cloned = getRetryOptions().clone(); + return new FixedRetryPolicy(cloned); + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/Retry.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/Retry.java deleted file mode 100644 index f88968c8341f..000000000000 --- a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/Retry.java +++ /dev/null @@ -1,154 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp; - -import com.azure.core.amqp.exception.AmqpException; -import com.azure.core.amqp.exception.ErrorCondition; - -import java.time.Duration; -import java.util.concurrent.atomic.AtomicInteger; - -/** - * An abstract representation of a policy to govern retrying of messaging operations. - */ -public abstract class Retry implements Cloneable { - /** - * Default for the minimum time between retry attempts. - */ - public static final Duration DEFAULT_RETRY_MIN_BACKOFF = Duration.ofSeconds(0); - /** - * Default for the maximum time between retry attempts. - */ - public static final Duration DEFAULT_RETRY_MAX_BACKOFF = Duration.ofSeconds(30); - /** - * Default for the maximum number of retry attempts. - */ - public static final int DEFAULT_MAX_RETRY_COUNT = 10; - - /** - * Base sleep wait time. - */ - private static final int SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS = 4; - - private final AtomicInteger retryCount = new AtomicInteger(); - private final int maxRetryCount; - - /** - * Creates a new instance of Retry with the maximum retry count of {@code maxRetryCount} - * - * @param maxRetryCount The maximum number of retries allowed. - */ - public Retry(int maxRetryCount) { - this.maxRetryCount = maxRetryCount; - } - - /** - * Check if the existing exception is a retriable exception. - * - * @param exception An exception that was observed for the operation to be retried. - * @return true if the exception is a retriable exception, otherwise false. - */ - public static boolean isRetriableException(Exception exception) { - return (exception instanceof AmqpException) && ((AmqpException) exception).isTransient(); - } - - /** - * Creates a Retry policy that does not retry failed requests. - * - * @return A new Retry policy that does not retry failed requests. - */ - public static Retry getNoRetry() { - return new ExponentialRetry(Duration.ZERO, Duration.ZERO, 0); - } - - /** - * Creates a Retry policy that retries failed requests up to {@link #DEFAULT_MAX_RETRY_COUNT 10} times. As the - * number of retry attempts increase, the period between retry attempts increases. - * - * @return A new instance with the default Retry values configured. - */ - public static Retry getDefaultRetry() { - return new ExponentialRetry(DEFAULT_RETRY_MIN_BACKOFF, DEFAULT_RETRY_MAX_BACKOFF, DEFAULT_MAX_RETRY_COUNT); - } - - /** - * Increments the number of retry attempts and returns the previous number of retry counts. - * - * @return The number of retry attempts before it was incremented. - */ - public int incrementRetryCount() { - return retryCount.getAndIncrement(); - } - - /** - * Gets the current number of retry attempts for this instance. - * - * @return The current number of retry attempts. - */ - public int getRetryCount() { - return retryCount.get(); - } - - /** - * Resets the number of retry attempts for this instance. - */ - public void resetRetryInterval() { - retryCount.set(0); - } - - /** - * Gets the maximum number of retry attempts that are allowed. - * - * @return The maximum number of retry attempts. - */ - public int getMaxRetryCount() { - return maxRetryCount; - } - - /** - * Calculates the amount of time to delay before the next retry attempt. - * - * @param lastException The last exception that was observed for the operation to be retried. - * @param remainingTime The amount of time remaining for the cumulative timeout across retry attempts. - * @return The amount of time to delay before retrying the associated operation; if {@code null}, - * then the operation is no longer eligible to be retried. - */ - public Duration getNextRetryInterval(Exception lastException, Duration remainingTime) { - int baseWaitTime = 0; - - if (!isRetriableException(lastException) || retryCount.get() >= maxRetryCount) { - return null; - } - - if (!(lastException instanceof AmqpException)) { - return null; - } - - if (((AmqpException) lastException).getErrorCondition() == ErrorCondition.SERVER_BUSY_ERROR) { - baseWaitTime += SERVER_BUSY_BASE_SLEEP_TIME_IN_SECS; - } - - return this.calculateNextRetryInterval(lastException, remainingTime, baseWaitTime, this.getRetryCount()); - } - - @Override - public Object clone() throws CloneNotSupportedException { - return super.clone(); - } - - /** - * Allows a concrete retry policy implementation to offer a base retry interval to be used in - * the calculations performed by 'Retry.GetNextRetryInterval'. - * - * @param lastException The last exception that was observed for the operation to be retried. - * @param remainingTime The amount of time remaining for the cumulative timeout across retry attempts. - * @param baseWaitSeconds The number of seconds to base the suggested retry interval on; - * this should be used as the minimum interval returned under normal circumstances. - * @param retryCount The number of retries that have already been attempted. - * @return The amount of time to delay before retrying the associated operation; if {@code null}, - * then the operation is no longer eligible to be retried. - */ - protected abstract Duration calculateNextRetryInterval(Exception lastException, Duration remainingTime, - int baseWaitSeconds, int retryCount); -} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryMode.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryMode.java new file mode 100644 index 000000000000..dac126fe64d1 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryMode.java @@ -0,0 +1,19 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +/** + * The type of approach to apply when calculating the delay between retry attempts. + */ +public enum RetryMode { + /** + * Retry attempts happen at fixed intervals; each delay is a consistent duration. + */ + FIXED, + /** + * Retry attempts will delay based on a backoff strategy, where each attempt will increase the duration that it + * waits before retrying. + */ + EXPONENTIAL, +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryOptions.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryOptions.java new file mode 100644 index 000000000000..3733b69a92cd --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryOptions.java @@ -0,0 +1,175 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import java.time.Duration; +import java.util.Objects; + +/** + * A set of options that can be specified to influence how retry attempts are made. + */ +public class RetryOptions { + private int maxRetries; + private Duration delay; + private Duration maxDelay; + private Duration tryTimeout; + private RetryMode retryMode; + + /** + * Creates an instance with the default retry options set. + */ + public RetryOptions() { + maxRetries = 3; + delay = Duration.ofMillis(800); + maxDelay = Duration.ofMinutes(1); + tryTimeout = Duration.ofMinutes(1); + retryMode = RetryMode.EXPONENTIAL; + } + + /** + * Sets the approach to use for calculating retry delays. + * + * @param retryMode The retry approach to use for calculating delays. + * @return The updated {@link RetryOptions} object. + */ + public RetryOptions retryMode(RetryMode retryMode) { + this.retryMode = retryMode; + return this; + } + + /** + * Sets the maximum number of retry attempts before considering the associated operation to have failed. + * + * @param numberOfRetries The maximum number of retry attempts. + * @return The updated {@link RetryOptions} object. + */ + public RetryOptions maxRetries(int numberOfRetries) { + this.maxRetries = numberOfRetries; + return this; + } + + /** + * Gets the delay between retry attempts for a fixed approach or the delay on which to base calculations for a + * backoff-approach. + * + * @param delay The delay between retry attempts. + * @return The updated {@link RetryOptions} object. + */ + public RetryOptions delay(Duration delay) { + this.delay = delay; + return this; + } + + /** + * Sets the maximum permissible delay between retry attempts. + * + * @param maximumDelay The maximum permissible delay between retry attempts. + * @return The updated {@link RetryOptions} object. + */ + public RetryOptions maxDelay(Duration maximumDelay) { + this.maxDelay = maximumDelay; + return this; + } + + /** + * Sets the maximum duration to wait for completion of a single attempt, whether the initial attempt or a retry. + * + * @param tryTimeout The maximum duration to wait for completion. + * @return The updated {@link RetryOptions} object. + */ + public RetryOptions tryTimeout(Duration tryTimeout) { + this.tryTimeout = tryTimeout; + return this; + } + + /** + * Gets the approach to use for calculating retry delays. + * + * @return The approach to use for calculating retry delays. + */ + public RetryMode retryMode() { + return retryMode; + } + + /** + * The maximum number of retry attempts before considering the associated operation to have failed. + * + * @return The maximum number of retry attempts before considering the associated operation to have failed. + */ + public int maxRetries() { + return maxRetries; + } + + /** + * Gets the delay between retry attempts for a fixed approach or the delay on which to base calculations for a + * backoff-approach. + * + * @return The delay between retry attempts. + */ + public Duration delay() { + return delay; + } + + /** + * Gets the maximum permissible delay between retry attempts. + * + * @return The maximum permissible delay between retry attempts. + */ + public Duration maxDelay() { + return maxDelay; + } + + /** + * Gets the maximum duration to wait for completion of a single attempt, whether the initial attempt or a retry. + * + * @return The maximum duration to wait for completion of a single attempt, whether the initial attempt or a retry. + */ + public Duration tryTimeout() { + return tryTimeout; + } + + /** + * Creates a new copy of the current instance, cloning its attributes into a new instance. + * + * @return A new copy of {@link RetryOptions}. + */ + public RetryOptions clone() { + return new RetryOptions() + .delay(delay) + .maxDelay(maxDelay) + .maxRetries(maxRetries) + .tryTimeout(tryTimeout) + .retryMode(retryMode); + } + + /** + * {@inheritDoc} + */ + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof RetryOptions)) { + return false; + } + + final RetryOptions other = (RetryOptions) obj; + + return this.maxRetries() == other.maxRetries() + && this.retryMode() == other.retryMode() + && Objects.equals(this.maxDelay(), other.maxDelay()) + && Objects.equals(this.delay(), other.delay()) + && Objects.equals(this.tryTimeout(), other.tryTimeout()); + } + + /** + * {@inheritDoc} + */ + @Override + public int hashCode() { + return Objects.hash(maxRetries, retryMode, maxDelay, delay, tryTimeout); + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryPolicy.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryPolicy.java new file mode 100644 index 000000000000..687d2b7f0f52 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/RetryPolicy.java @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import com.azure.core.amqp.exception.AmqpException; + +import java.time.Duration; +import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; + +import static com.azure.core.amqp.exception.ErrorCondition.SERVER_BUSY_ERROR; + +/** + * An abstract representation of a policy to govern retrying of messaging operations. + */ +public abstract class RetryPolicy { + static final long NANOS_PER_SECOND = 1000_000_000L; + + private static final double JITTER_FACTOR = 0.08; + // Base sleep wait time. + private static final Duration SERVER_BUSY_WAIT_TIME = Duration.ofSeconds(4); + + private final RetryOptions retryOptions; + private final Duration baseJitter; + + /** + * Creates an instance with the given retry options. If {@link RetryOptions#maxDelay()}, {@link + * RetryOptions#delay()}, or {@link RetryOptions#maxRetries()} is equal to {@link Duration#ZERO} or zero, requests + * failing with a retriable exception will not be retried. + * + * @param retryOptions The options to set on this retry policy. + * @throws NullPointerException if {@code retryOptions} is {@code null}. + */ + protected RetryPolicy(RetryOptions retryOptions) { + Objects.requireNonNull(retryOptions); + + this.retryOptions = retryOptions; + + // 1 second = 1.0 * 10^9 nanoseconds. + final Double jitterInNanos = retryOptions.delay().getSeconds() * JITTER_FACTOR * NANOS_PER_SECOND; + baseJitter = Duration.ofNanos(jitterInNanos.longValue()); + } + + /** + * Gets the set of options used to configure this retry policy. + * + * @return The set of options used to configure this retry policy. + */ + protected RetryOptions getRetryOptions() { + return retryOptions; + } + + /** + * Gets the maximum number of retry attempts. + * + * @return The maximum number of retry attempts. + */ + public int getMaxRetries() { + return retryOptions.maxRetries(); + } + + /** + * Calculates the amount of time to delay before the next retry attempt. + * + * @param lastException The last exception that was observed for the operation to be retried. + * @param retryCount The number of attempts that have been made, including the initial attempt before any + * retries. + * @return The amount of time to delay before retrying the associated operation; if {@code null}, then the operation + * is no longer eligible to be retried. + */ + public Duration calculateRetryDelay(Exception lastException, int retryCount) { + if (retryOptions.delay() == Duration.ZERO + || retryOptions.maxDelay() == Duration.ZERO + || retryCount > retryOptions.maxRetries()) { + return null; + } + + final Duration baseDelay; + if (lastException instanceof AmqpException && isRetriableException(lastException)) { + baseDelay = ((AmqpException) lastException).getErrorCondition() == SERVER_BUSY_ERROR + ? retryOptions.delay().plus(SERVER_BUSY_WAIT_TIME) + : retryOptions.delay(); + } else if (lastException instanceof TimeoutException) { + baseDelay = retryOptions.delay(); + } else { + baseDelay = null; + } + + if (baseDelay == null) { + return null; + } + + final Duration delay = calculateRetryDelay(retryCount, baseDelay, baseJitter, ThreadLocalRandom.current()); + + // If delay is smaller or equal to the maximum delay, return the maximum delay. + return delay.compareTo(retryOptions.maxDelay()) <= 0 + ? delay + : retryOptions.maxDelay(); + } + + /** + * Calculates the amount of time to delay before the next retry attempt based on the {@code retryCound}, {@code + * baseDelay}, and {@code baseJitter}. + * + * @param retryCount The number of attempts that have been made, including the initial attempt before any + * retries. + * @param baseDelay The base delay for a retry attempt. + * @param baseJitter The base jitter delay. + * @param random The random number generator. Can be utilised to calculate a random jitter value for the + * retry. + * @return The amount of time to delay before retrying to associated operation; or {@code null} if the it cannot be + * retried. + */ + protected abstract Duration calculateRetryDelay(int retryCount, Duration baseDelay, Duration baseJitter, + ThreadLocalRandom random); + + /** + * Creates a clone of the retry policy. + * + * @return A new clone of the retry policy. + */ + public abstract RetryPolicy clone(); + + @Override + public int hashCode() { + return Objects.hash(retryOptions); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (!(obj instanceof RetryPolicy)) { + return false; + } + + final RetryPolicy other = (RetryPolicy) obj; + return retryOptions.equals(other.retryOptions); + } + + /** + * Check if the existing exception is a retriable exception. + * + * @param exception An exception that was observed for the operation to be retried. + * @return true if the exception is a retriable exception, otherwise false. + */ + private static boolean isRetriableException(Exception exception) { + return (exception instanceof AmqpException) && ((AmqpException) exception).isTransient(); + } +} diff --git a/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java new file mode 100644 index 000000000000..bbf800bcf402 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/RetryUtil.java @@ -0,0 +1,81 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.ExponentialRetryPolicy; +import com.azure.core.amqp.FixedRetryPolicy; +import com.azure.core.amqp.RetryOptions; +import com.azure.core.amqp.RetryPolicy; +import reactor.core.Exceptions; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.Locale; +import java.util.concurrent.TimeoutException; + +/** + * Helper class to help with retry policies. + */ +public class RetryUtil { + // So this class can't be instantiated. + private RetryUtil() { + } + + /** + * Given a set of {@link RetryOptions options}, creates the appropriate retry policy. + * + * @param options A set of options used to configure the retry policy. + * @return A new retry policy configured with the given {@code options}. + * @throws IllegalArgumentException If {@link RetryOptions#retryMode()} is not a supported mode. + */ + public static RetryPolicy getRetryPolicy(RetryOptions options) { + switch (options.retryMode()) { + case FIXED: + return new FixedRetryPolicy(options); + case EXPONENTIAL: + return new ExponentialRetryPolicy(options); + default: + throw new IllegalArgumentException( + String.format(Locale.ROOT, "Mode is not supported: %s", options.retryMode())); + } + } + + /** + * Given a {@link Flux} will apply the retry policy to it when the operation times out. + * + * @param source The publisher to apply the retry policy to. + * @return A publisher that returns the results of the {@link Flux} if any of the retry attempts are successful. + * Otherwise, propagates a {@link TimeoutException}. + */ + public static Flux withRetry(Flux source, Duration operationTimeout, RetryPolicy retryPolicy) { + return Flux.defer(() -> source.timeout(operationTimeout)) + .retryWhen(errors -> retry(errors, retryPolicy)); + } + + /** + * Given a {@link Mono} will apply the retry policy to it when the operation times out. + * + * @param source The publisher to apply the retry policy to. + * @return A publisher that returns the results of the {@link Flux} if any of the retry attempts are successful. + * Otherwise, propagates a {@link TimeoutException}. + */ + public static Mono withRetry(Mono source, Duration operationTimeout, RetryPolicy retryPolicy) { + return Mono.defer(() -> source.timeout(operationTimeout)) + .retryWhen(errors -> retry(errors, retryPolicy)); + } + + private static Flux retry(Flux source, RetryPolicy retryPolicy) { + return source.zipWith(Flux.range(1, retryPolicy.getMaxRetries() + 1), + (error, attempt) -> { + if (!(error instanceof TimeoutException) || attempt > retryPolicy.getMaxRetries()) { + throw Exceptions.propagate(error); + } + + //TODO (conniey): is it possible to add a logger here even though it is static? :/ + return retryPolicy.calculateRetryDelay((TimeoutException) error, attempt); + }) + .flatMap(Mono::delay); + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/ExponentialRetryPolicyTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/ExponentialRetryPolicyTest.java new file mode 100644 index 000000000000..fa6049ad6e28 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/ExponentialRetryPolicyTest.java @@ -0,0 +1,112 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.exception.ErrorCondition; +import com.azure.core.amqp.exception.ErrorContext; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; + +public class ExponentialRetryPolicyTest { + private final ErrorContext errorContext = new ErrorContext("test-namespace"); + private final AmqpException exception = new AmqpException(true, ErrorCondition.SERVER_BUSY_ERROR, "error message", errorContext); + private final Duration minBackoff = Duration.ofSeconds(15); + private final Duration maxBackoff = Duration.ofSeconds(60); + private final Duration tolerance = Duration.ofSeconds(1); + private final int retryAttempts = 5; + private final RetryOptions options = new RetryOptions() + .delay(minBackoff) + .maxDelay(maxBackoff) + .maxRetries(retryAttempts) + .retryMode(RetryMode.EXPONENTIAL); + + /** + * Verifies that when the service is busy and we retry an exception multiple times, the retry duration gets longer. + */ + @Test + public void retryDurationIncreases() { + // Arrange + + final ExponentialRetryPolicy retry = new ExponentialRetryPolicy(options); + + // Act + final Duration firstRetryInterval = retry.calculateRetryDelay(exception, 1); + final Duration secondRetryInterval = retry.calculateRetryDelay(exception, 2); + + // Assert + Assert.assertNotNull(firstRetryInterval); + Assert.assertNotNull(secondRetryInterval); + Assert.assertTrue(secondRetryInterval.toNanos() > firstRetryInterval.toNanos()); + } + + /** + * Verifies that we can clone the retry instance and it behaves the same as its original. + */ + @Test + public void retryCloneBehavesSame() { + // Arrange + final ExponentialRetryPolicy retry = new ExponentialRetryPolicy(options); + final ExponentialRetryPolicy clone = (ExponentialRetryPolicy) retry.clone(); + + final Duration retryInterval = retry.calculateRetryDelay(exception, 1); + final Duration cloneRetryInterval = clone.calculateRetryDelay(exception, 4); + + // Assert + Assert.assertNotNull(retryInterval); + Assert.assertNotNull(cloneRetryInterval); + + // The retry interval for the clone will be larger because we've incremented the retry count, so it should + // calculate a longer waiting period. + Assert.assertTrue(cloneRetryInterval.compareTo(retryInterval) > 0); + } + + /** + * Verify that two instances created with the same set of RetryOptions are equal. + */ + @Test + public void isEquals() { + // Arrange + final ExponentialRetryPolicy policy = new ExponentialRetryPolicy(options); + + final RetryOptions otherOptions = new RetryOptions() + .delay(minBackoff) + .maxDelay(maxBackoff) + .maxRetries(retryAttempts) + .retryMode(RetryMode.EXPONENTIAL); + final ExponentialRetryPolicy otherPolicy = new ExponentialRetryPolicy(otherOptions); + + // Assert + Assert.assertEquals(policy, otherPolicy); + Assert.assertEquals(policy.hashCode(), otherPolicy.hashCode()); + } + + @Test + public void retryClone() { + // Arrange + final ExponentialRetryPolicy retry = new ExponentialRetryPolicy(options); + final ExponentialRetryPolicy clone = (ExponentialRetryPolicy) retry.clone(); + final int retryCount = 1; + + // Act + final Duration retryInterval = retry.calculateRetryDelay(exception, retryCount); + final Duration cloneRetryInterval = clone.calculateRetryDelay(exception, retryCount); + + // Assert + Assert.assertNotSame(retry, clone); + Assert.assertEquals(retry, clone); + Assert.assertEquals(retry.hashCode(), clone.hashCode()); + + Assert.assertNotNull(retryInterval); + Assert.assertNotNull(cloneRetryInterval); + + // Assert that the cloned interval is within our jitter threshold. + final Duration minValue = retryInterval.minus(tolerance); + final Duration maxValue = retryInterval.plus(tolerance); + Assert.assertTrue(minValue.compareTo(cloneRetryInterval) < 0 + && maxValue.compareTo(cloneRetryInterval) > 0); + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/ExponentialRetryTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/ExponentialRetryTest.java deleted file mode 100644 index 1bf45032e090..000000000000 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/ExponentialRetryTest.java +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp; - -import com.azure.core.amqp.exception.AmqpException; -import com.azure.core.amqp.exception.ErrorCondition; -import com.azure.core.amqp.exception.ErrorContext; -import org.junit.Assert; -import org.junit.Test; - -import java.time.Duration; - -public class ExponentialRetryTest { - private final ErrorContext errorContext = new ErrorContext("test-namespace"); - private final AmqpException exception = new AmqpException(true, ErrorCondition.SERVER_BUSY_ERROR, "error message", errorContext); - private final Duration minBackoff = Duration.ofSeconds(15); - private final Duration maxBackoff = Duration.ofSeconds(45); - private final int retryAttempts = 4; - - /** - * Verifies that when the service is busy and we retry an exception multiple times, the retry duration gets longer. - */ - @Test - public void retryDurationIncreases() { - // Arrange - final ExponentialRetry retry = new ExponentialRetry(minBackoff, maxBackoff, retryAttempts); - final Duration remainingTime = Duration.ofSeconds(60); - - // Act - retry.incrementRetryCount(); - final Duration firstRetryInterval = retry.getNextRetryInterval(exception, remainingTime); - Assert.assertNotNull(firstRetryInterval); - - retry.incrementRetryCount(); - final Duration leftoverTime = remainingTime.minus(firstRetryInterval); - final Duration secondRetryInterval = retry.getNextRetryInterval(exception, leftoverTime); - - // Assert - Assert.assertNotNull(secondRetryInterval); - Assert.assertTrue(secondRetryInterval.toNanos() > firstRetryInterval.toNanos()); - } - - /** - * Verifies that we can clone the retry instance and it behaves the same as its original. - */ - @Test - public void retryCloneBehavesSame() { - // Arrange - final ExponentialRetry retry = new ExponentialRetry(minBackoff, maxBackoff, retryAttempts); - final ExponentialRetry clone = (ExponentialRetry) retry.clone(); - - final Duration remainingTime = Duration.ofSeconds(60); - - retry.incrementRetryCount(); - final Duration retryInterval = retry.getNextRetryInterval(exception, remainingTime); - - clone.incrementRetryCount(); - clone.incrementRetryCount(); - clone.incrementRetryCount(); - final Duration cloneRetryInterval = clone.getNextRetryInterval(exception, remainingTime); - - // Assert - Assert.assertNotNull(retryInterval); - Assert.assertNotNull(cloneRetryInterval); - - // The retry interval for the clone will be larger because we've incremented the retry count, so it should - // calculate a longer waiting period. - Assert.assertTrue(cloneRetryInterval.toNanos() > retryInterval.toNanos()); - } - - @Test - public void retryClone() { - // Arrange - final ExponentialRetry retry = new ExponentialRetry(minBackoff, maxBackoff, retryAttempts); - final ExponentialRetry clone = (ExponentialRetry) retry.clone(); - - final Duration remainingTime = Duration.ofSeconds(60); - - retry.incrementRetryCount(); - final Duration retryInterval = retry.getNextRetryInterval(exception, remainingTime); - - clone.incrementRetryCount(); - final Duration cloneRetryInterval = clone.getNextRetryInterval(exception, remainingTime); - - // Assert - Assert.assertNotSame(retry, clone); - Assert.assertEquals(retry, clone); - Assert.assertEquals(retry.hashCode(), clone.hashCode()); - - Assert.assertNotNull(retryInterval); - Assert.assertNotNull(cloneRetryInterval); - Assert.assertEquals(retryInterval, cloneRetryInterval); - } -} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/FixedRetryPolicyTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/FixedRetryPolicyTest.java new file mode 100644 index 000000000000..1ed9c257ed08 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/FixedRetryPolicyTest.java @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.exception.ErrorCondition; +import com.azure.core.amqp.exception.ErrorContext; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; + +public class FixedRetryPolicyTest { + private final ErrorContext errorContext = new ErrorContext("test-namespace"); + private final AmqpException exception = new AmqpException(true, ErrorCondition.SERVER_BUSY_ERROR, "error message", errorContext); + private final Duration minBackoff = Duration.ofSeconds(15); + private final Duration maxBackoff = Duration.ofSeconds(60); + private final Duration tolerance = Duration.ofSeconds(1); + private final int retryAttempts = 5; + private final RetryOptions options = new RetryOptions() + .delay(minBackoff) + .maxDelay(maxBackoff) + .maxRetries(retryAttempts) + .retryMode(RetryMode.FIXED); + + /** + * Verifies that when the service is busy and we retry an exception multiple times, the retry duration gets longer. + */ + @Test + public void retryDurationIsTheSame() { + // Arrange + final FixedRetryPolicy retry = new FixedRetryPolicy(options); + + // Act + final Duration firstRetryInterval = retry.calculateRetryDelay(exception, 1); + final Duration secondRetryInterval = retry.calculateRetryDelay(exception, 2); + + // Assert + Assert.assertNotNull(firstRetryInterval); + Assert.assertNotNull(secondRetryInterval); + + // Assert that the second retry interval is within our jitter threshold. + final Duration minValue = firstRetryInterval.minus(tolerance); + final Duration maxValue = firstRetryInterval.plus(tolerance); + Assert.assertTrue(minValue.compareTo(secondRetryInterval) < 0 + && maxValue.compareTo(secondRetryInterval) > 0); + } + + /** + * Verifies that we can clone the retry instance and it behaves the same as its original. + */ + @Test + public void retryCloneBehavesSame() { + // Arrange + final FixedRetryPolicy retry = new FixedRetryPolicy(options); + final FixedRetryPolicy clone = (FixedRetryPolicy) retry.clone(); + + final Duration retryInterval = retry.calculateRetryDelay(exception, 1); + final Duration cloneRetryInterval = clone.calculateRetryDelay(exception, 4); + + // Assert + Assert.assertNotNull(retryInterval); + Assert.assertNotNull(cloneRetryInterval); + + // Assert that the cloned retry interval is within our jitter threshold. + final Duration minValue = retryInterval.minus(tolerance); + final Duration maxValue = retryInterval.plus(tolerance); + Assert.assertTrue(minValue.compareTo(cloneRetryInterval) < 0 + && maxValue.compareTo(cloneRetryInterval) > 0); + } + + /** + * Verify that two instances created with the same set of RetryOptions are equal. + */ + @Test + public void isEquals() { + // Arrange + final FixedRetryPolicy policy = new FixedRetryPolicy(options); + + final RetryOptions otherOptions = new RetryOptions() + .delay(minBackoff) + .maxDelay(maxBackoff) + .maxRetries(retryAttempts) + .retryMode(RetryMode.FIXED); + final FixedRetryPolicy otherPolicy = new FixedRetryPolicy(otherOptions); + + // Assert + Assert.assertEquals(policy, otherPolicy); + Assert.assertEquals(policy.hashCode(), otherPolicy.hashCode()); + } + + @Test + public void retryClone() { + // Arrange + final FixedRetryPolicy retry = new FixedRetryPolicy(options); + final FixedRetryPolicy clone = (FixedRetryPolicy) retry.clone(); + final int retryCount = 1; + + // Act + final Duration retryInterval = retry.calculateRetryDelay(exception, retryCount); + final Duration cloneRetryInterval = clone.calculateRetryDelay(exception, retryCount); + + // Assert + Assert.assertNotSame(retry, clone); + Assert.assertEquals(retry, clone); + Assert.assertEquals(retry.hashCode(), clone.hashCode()); + + Assert.assertNotNull(retryInterval); + Assert.assertNotNull(cloneRetryInterval); + + // Assert that the cloned interval is within our jitter threshold. + final Duration minValue = retryInterval.minus(tolerance); + final Duration maxValue = retryInterval.plus(tolerance); + Assert.assertTrue(minValue.compareTo(cloneRetryInterval) < 0 + && maxValue.compareTo(cloneRetryInterval) > 0); + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryOptionsTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryOptionsTest.java new file mode 100644 index 000000000000..e2af9cd90239 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryOptionsTest.java @@ -0,0 +1,130 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; + +public class RetryOptionsTest { + /** + * Test there are defaults set when creating RetryOptions + */ + @Test + public void constructor() { + // Arrange + final Duration defaultTimeout = Duration.ofMinutes(1); + final int maxRetries = 3; + + // Act + final RetryOptions options = new RetryOptions(); + + // Assert + Assert.assertEquals(maxRetries, options.maxRetries()); + Assert.assertEquals(RetryMode.EXPONENTIAL, options.retryMode()); + Assert.assertEquals(defaultTimeout, options.maxDelay()); + Assert.assertEquals(defaultTimeout, options.tryTimeout()); + } + + /** + * Verifies we can set new properties. + */ + @Test + public void canSetProperties() { + // Arrange + final Duration delay = Duration.ofMillis(1000); + final Duration maxDelay = Duration.ofMinutes(10); + final Duration tryTimeout = Duration.ofMinutes(2); + final int retries = 10; + final RetryMode retryMode = RetryMode.FIXED; + final RetryOptions options = new RetryOptions(); + + // Act + final RetryOptions actual = options.retryMode(retryMode) + .maxDelay(maxDelay) + .delay(delay) + .maxRetries(retries) + .tryTimeout(tryTimeout); + + // Assert + Assert.assertEquals(delay, actual.delay()); + Assert.assertEquals(maxDelay, actual.maxDelay()); + Assert.assertEquals(tryTimeout, actual.tryTimeout()); + Assert.assertEquals(retries, actual.maxRetries()); + Assert.assertEquals(retryMode, actual.retryMode()); + } + + /** + * Verifies that we can clone the RetryOptions object, and its fields change independent of the original. + */ + @Test + public void canClone() { + // Arrange + final Duration delay = Duration.ofMillis(1000); + final Duration maxDelay = Duration.ofMinutes(10); + final Duration tryTimeout = Duration.ofMinutes(2); + final int retries = 10; + final RetryMode retryMode = RetryMode.FIXED; + + final Duration newDelay = Duration.ofMillis(700); + final Duration newMaxDelay = Duration.ofSeconds(90); + final Duration newTryTimeout = Duration.ofMinutes(2); + final int newRetries = 5; + final RetryMode newRetryMode = RetryMode.EXPONENTIAL; + + final RetryOptions original = new RetryOptions().retryMode(retryMode) + .maxDelay(maxDelay) + .delay(delay) + .maxRetries(retries) + .tryTimeout(tryTimeout); + + // Act + final RetryOptions clone = original.clone(); + Assert.assertNotNull(clone); + Assert.assertEquals(original, clone); + + final RetryOptions actual = clone + .retryMode(newRetryMode) + .maxDelay(newMaxDelay) + .delay(newDelay) + .maxRetries(newRetries) + .tryTimeout(newTryTimeout); + + // Assert + Assert.assertNotSame(original, actual); + Assert.assertEquals(delay, original.delay()); + Assert.assertEquals(maxDelay, original.maxDelay()); + Assert.assertEquals(tryTimeout, original.tryTimeout()); + Assert.assertEquals(retries, original.maxRetries()); + Assert.assertEquals(retryMode, original.retryMode()); + + Assert.assertEquals(newDelay, actual.delay()); + Assert.assertEquals(newMaxDelay, actual.maxDelay()); + Assert.assertEquals(newTryTimeout, actual.tryTimeout()); + Assert.assertEquals(newRetries, actual.maxRetries()); + Assert.assertEquals(newRetryMode, actual.retryMode()); + } + + @Test + public void isEqual() { + // Arrange + final RetryOptions first = new RetryOptions() + .retryMode(RetryMode.FIXED) + .maxDelay(Duration.ofMinutes(10)) + .delay(Duration.ofMillis(1000)) + .maxRetries(10) + .tryTimeout(Duration.ofMinutes(2)); + + final RetryOptions second = new RetryOptions() + .retryMode(RetryMode.FIXED) + .maxDelay(Duration.ofMinutes(10)) + .delay(Duration.ofMillis(1000)) + .maxRetries(10) + .tryTimeout(Duration.ofMinutes(2)); + + Assert.assertEquals(first, second); + Assert.assertEquals(first.hashCode(), second.hashCode()); + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryPolicyTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryPolicyTest.java new file mode 100644 index 000000000000..420d5c273ad0 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryPolicyTest.java @@ -0,0 +1,140 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp; + +import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.exception.ErrorContext; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeoutException; + +public class RetryPolicyTest { + private final ErrorContext errorContext = new ErrorContext("test-namespace"); + private final int maxRetries = 10; + private final Duration maxDelay = Duration.ofSeconds(120); + private final Duration delay = Duration.ofSeconds(20); + private final RetryOptions options = new RetryOptions() + .maxRetries(maxRetries) + .maxDelay(maxDelay) + .delay(delay); + + /** + * Verifies we retry on a retriable AmqpException. + */ + @Test + public void isRetriableException() { + // Arrange + final Exception exception = new AmqpException(true, "error message", errorContext); + final Duration expected = Duration.ofSeconds(40); + final int count = 2; + final RetryPolicy policy = new MockRetryPolicy(options, expected); + + // Act + final Duration actual = policy.calculateRetryDelay(exception, count); + + // Assert + Assert.assertEquals(expected, actual); + Assert.assertEquals(maxRetries, policy.getMaxRetries()); + } + + /** + * Verifies that a timeout exception will allow for retries. + */ + @Test + public void isTimeoutException() { + // Arrange + final Exception exception = new TimeoutException("test-message-timeout"); + final Duration expected = Duration.ofSeconds(40); + final int count = 2; + final RetryPolicy policy = new MockRetryPolicy(options, expected); + + // Act + final Duration actual = policy.calculateRetryDelay(exception, count); + + // Assert + Assert.assertEquals(expected, actual); + } + + /** + * Verifies that null is returned if the exception is not an AmqpException. + */ + @Test + public void notRetriableException() { + // Arrange + final Exception invalidException = new RuntimeException("invalid exception"); + final Duration expected = Duration.ofSeconds(40); + final int count = 2; + final RetryPolicy policy = new MockRetryPolicy(options, expected); + + // Act + final Duration actual = policy.calculateRetryDelay(invalidException, count); + + // Assert + Assert.assertNull(actual); + } + + /** + * Verifies that null is returned if the AmqpException is not transient. + */ + @Test + public void notRetriableExceptionNotTransient() { + // Arrange + final Exception invalidException = new AmqpException(false, "Some test exception", errorContext); + final Duration expected = Duration.ofSeconds(40); + final int count = 2; + final RetryPolicy policy = new MockRetryPolicy(options, expected); + + // Act + final Duration actual = policy.calculateRetryDelay(invalidException, count); + + // Assert + Assert.assertNull(actual); + } + + /** + * Verifies that we return {@link RetryOptions#maxDelay()} if the returned delay is larger than the maximum. + */ + @Test + public void returnsMaxDelayIfDelayLarger() { + // Arrange + final Exception exception = new AmqpException(true, "error message", errorContext); + final Duration returnedDelay = maxDelay.plus(Duration.ofMillis(50)); + final int count = 2; + final RetryPolicy policy = new MockRetryPolicy(options, returnedDelay); + + // Act + final Duration actual = policy.calculateRetryDelay(exception, count); + + // Assert + Assert.assertEquals(maxDelay, actual); + Assert.assertEquals(maxRetries, policy.getMaxRetries()); + } + + private class MockRetryPolicy extends RetryPolicy { + private final Duration expectedDuration; + + /** + * Creates an instance with the given retry options. + * + * @param retryOptions The options to set on this retry policy. + */ + MockRetryPolicy(RetryOptions retryOptions, Duration expectedDuration) { + super(retryOptions); + this.expectedDuration = expectedDuration; + } + + @Override + protected Duration calculateRetryDelay(int retryCount, Duration baseDelay, Duration baseJitter, ThreadLocalRandom random) { + return expectedDuration; + } + + @Override + public RetryPolicy clone() { + return new MockRetryPolicy(getRetryOptions(), expectedDuration); + } + } +} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryTest.java deleted file mode 100644 index fa30f6d5cce2..000000000000 --- a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/RetryTest.java +++ /dev/null @@ -1,124 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -package com.azure.core.amqp; - -import com.azure.core.amqp.exception.AmqpException; -import com.azure.core.amqp.exception.ErrorCondition; -import com.azure.core.amqp.exception.ErrorContext; -import org.junit.Assert; -import org.junit.Test; - -import java.time.Duration; - -public class RetryTest { - private final ErrorContext errorContext = new ErrorContext("test-namespace"); - - /** - * Verifies that when the service is busy and we retry an exception multiple times, the retry duration gets longer. - */ - @Test - public void defaultRetryPolicy() { - // Arrange - final Retry retry = Retry.getDefaultRetry(); - final AmqpException exception = new AmqpException(true, ErrorCondition.SERVER_BUSY_ERROR, "error message", errorContext); - final Duration remainingTime = Duration.ofSeconds(60); - - // Act - retry.incrementRetryCount(); - final Duration firstRetryInterval = retry.getNextRetryInterval(exception, remainingTime); - Assert.assertNotNull(firstRetryInterval); - - retry.incrementRetryCount(); - final Duration leftoverTime = remainingTime.minus(firstRetryInterval); - final Duration secondRetryInterval = retry.getNextRetryInterval(exception, leftoverTime); - - // Assert - Assert.assertNotNull(secondRetryInterval); - Assert.assertTrue(secondRetryInterval.toNanos() > firstRetryInterval.toNanos()); - } - - /** - * Verifies we can increment the retry count. - */ - @Test - public void canIncrementRetryCount() { - Retry retry = Retry.getDefaultRetry(); - Assert.assertEquals(0, retry.getRetryCount()); - Assert.assertEquals(0, retry.incrementRetryCount()); - - Assert.assertEquals(1, retry.getRetryCount()); - Assert.assertEquals(1, retry.incrementRetryCount()); - - Assert.assertEquals(2, retry.getRetryCount()); - Assert.assertEquals(2, retry.incrementRetryCount()); - - retry.resetRetryInterval(); - - Assert.assertEquals(0, retry.getRetryCount()); - Assert.assertEquals(0, retry.incrementRetryCount()); - - Assert.assertEquals(1, retry.getRetryCount()); - } - - @Test - public void isRetriableException() { - final Exception exception = new AmqpException(true, "error message", errorContext); - Assert.assertTrue(Retry.isRetriableException(exception)); - } - - @Test - public void notRetriableException() { - final Exception invalidException = new RuntimeException("invalid exception"); - Assert.assertFalse(Retry.isRetriableException(invalidException)); - } - - @Test - public void notRetriableExceptionNotTransient() { - final Exception invalidException = new AmqpException(false, "Some test exception", errorContext); - Assert.assertFalse(Retry.isRetriableException(invalidException)); - } - - /** - * Verifies that using no retry policy does not allow us to retry a failed request. - */ - @Test - public void noRetryPolicy() { - // Arrange - final Retry noRetry = Retry.getNoRetry(); - final Exception exception = new AmqpException(true, "error message", errorContext); - final Duration remainingTime = Duration.ofSeconds(60); - - // Act - final Duration nextRetryInterval = noRetry.getNextRetryInterval(exception, remainingTime); - int retryCount = noRetry.incrementRetryCount(); - - // Assert - Assert.assertEquals(0, retryCount); - Assert.assertNull(nextRetryInterval); - } - - /** - * Verifies that if we exceed the number of allowed retry attempts, the next retry interval, even if there is time - * remaining, is null. - */ - @Test - public void excessMaxRetry() { - // Arrange - final Retry retry = Retry.getDefaultRetry(); - final Exception exception = new AmqpException(true, "error message", errorContext); - final Duration sixtySec = Duration.ofSeconds(60); - - // Simulates that we've tried to retry the max number of requests this allows. - for (int i = 0; i < retry.getMaxRetryCount(); i++) { - retry.incrementRetryCount(); - } - - // Act - final Duration nextRetryInterval = retry.getNextRetryInterval(exception, sixtySec); - - // Assert - Assert.assertEquals(Retry.DEFAULT_MAX_RETRY_COUNT, retry.getRetryCount()); - Assert.assertNull(nextRetryInterval); - } -} diff --git a/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java new file mode 100644 index 000000000000..0da0ac848499 --- /dev/null +++ b/sdk/core/azure-core-amqp/src/test/java/com/azure/core/amqp/implementation/RetryUtilTest.java @@ -0,0 +1,94 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.core.amqp.implementation; + +import com.azure.core.amqp.ExponentialRetryPolicy; +import com.azure.core.amqp.FixedRetryPolicy; +import com.azure.core.amqp.RetryMode; +import com.azure.core.amqp.RetryOptions; +import com.azure.core.amqp.RetryPolicy; +import com.azure.core.amqp.TransportType; +import org.junit.Assert; +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.time.Duration; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; + +public class RetryUtilTest { + @Test + public void getCorrectModeFixed() { + // Act + final RetryOptions retryOptions = new RetryOptions() + .retryMode(RetryMode.FIXED); + final RetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); + + // Assert + Assert.assertNotNull(retryPolicy); + Assert.assertEquals(FixedRetryPolicy.class, retryPolicy.getClass()); + } + + @Test + public void getCorrectModeExponential() { + // Act + final RetryOptions retryOptions = new RetryOptions() + .retryMode(RetryMode.EXPONENTIAL); + final RetryPolicy retryPolicy = RetryUtil.getRetryPolicy(retryOptions); + + // Assert + Assert.assertNotNull(retryPolicy); + Assert.assertEquals(ExponentialRetryPolicy.class, retryPolicy.getClass()); + } + + @Test + public void withRetryFlux() { + // Arrange + final RetryOptions options = new RetryOptions() + .delay(Duration.ofSeconds(1)) + .maxRetries(2); + final Duration totalWaitTime = Duration.ofSeconds(options.maxRetries() * options.delay().getSeconds()); + final Duration timeout = Duration.ofMillis(500); + + final AtomicInteger resubscribe = new AtomicInteger(); + final RetryPolicy retryPolicy = new FixedRetryPolicy(options); + final Flux neverFlux = Flux.never() + .doOnSubscribe(s -> resubscribe.incrementAndGet()); + + // Act & Assert + StepVerifier.create(RetryUtil.withRetry(neverFlux, timeout, retryPolicy)) + .expectSubscription() + .thenAwait(totalWaitTime) + .expectError(TimeoutException.class) + .verify(); + + Assert.assertEquals(options.maxRetries() + 1, resubscribe.get()); + } + + @Test + public void withRetryMono() { + // Arrange + final RetryOptions options = new RetryOptions() + .delay(Duration.ofSeconds(1)) + .maxRetries(2); + final Duration totalWaitTime = Duration.ofSeconds(options.maxRetries() * options.delay().getSeconds()); + final Duration timeout = Duration.ofMillis(500); + + final AtomicInteger resubscribe = new AtomicInteger(); + final RetryPolicy retryPolicy = new FixedRetryPolicy(options); + final Mono neverFlux = Mono.never() + .doOnSubscribe(s -> resubscribe.incrementAndGet()); + + // Act & Assert + StepVerifier.create(RetryUtil.withRetry(neverFlux, timeout, retryPolicy)) + .expectSubscription() + .thenAwait(totalWaitTime) + .expectError(TimeoutException.class) + .verify(); + + Assert.assertEquals(options.maxRetries() + 1, resubscribe.get()); + } +} diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java index 06cc9902c901..94d22d31b068 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java @@ -9,6 +9,7 @@ import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.AmqpConstants; import com.azure.messaging.eventhubs.implementation.ErrorContextProvider; +import com.azure.messaging.eventhubs.models.BatchOptions; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java index 58dab86dc69d..b2206f0ac4e9 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubAsyncClient.java @@ -4,8 +4,10 @@ package com.azure.messaging.eventhubs; import com.azure.core.amqp.AmqpConnection; +import com.azure.core.amqp.RetryPolicy; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.ErrorContext; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.implementation.annotation.ReturnType; import com.azure.core.implementation.annotation.ServiceClient; import com.azure.core.implementation.annotation.ServiceMethod; @@ -86,15 +88,15 @@ public class EventHubAsyncClient implements Closeable { this.eventHubPath = connectionOptions.eventHubPath(); this.connectionId = StringUtil.getRandomString("MF"); this.connectionMono = Mono.fromCallable(() -> { - return (EventHubConnection) new ReactorConnection(connectionId, connectionOptions, provider, handlerProvider, new ResponseMapper()); + return (EventHubConnection) new ReactorConnection(connectionId, connectionOptions, provider, + handlerProvider, new ResponseMapper()); }).doOnSubscribe(c -> hasConnection.set(true)) .cache(); this.defaultProducerOptions = new EventHubProducerOptions() - .retry(connectionOptions.retryPolicy()) - .timeout(connectionOptions.timeout()); + .retry(connectionOptions.retry()); this.defaultConsumerOptions = new EventHubConsumerOptions() - .retry(connectionOptions.retryPolicy()) + .retry(connectionOptions.retry()) .scheduler(connectionOptions.scheduler()); } @@ -156,12 +158,10 @@ public EventHubProducer createProducer() { public EventHubProducer createProducer(EventHubProducerOptions options) { Objects.requireNonNull(options); - final EventHubProducerOptions clonedOptions = (EventHubProducerOptions) options.clone(); - if (clonedOptions.timeout() == null) { - clonedOptions.timeout(connectionOptions.timeout()); - } + final EventHubProducerOptions clonedOptions = options.clone(); + if (clonedOptions.retry() == null) { - clonedOptions.retry(connectionOptions.retryPolicy()); + clonedOptions.retry(connectionOptions.retry()); } final String entityPath; @@ -175,10 +175,13 @@ public EventHubProducer createProducer(EventHubProducerOptions options) { linkName = StringUtil.getRandomString("PS"); } - final Mono amqpLinkMono = connectionMono.flatMap(connection -> connection.createSession(entityPath)) + final Mono amqpLinkMono = connectionMono + .flatMap(connection -> connection.createSession(entityPath)) .flatMap(session -> { - logger.info("Creating producer."); - return session.createProducer(linkName, entityPath, clonedOptions.timeout(), clonedOptions.retry()) + logger.verbose("Creating producer for {}", entityPath); + final RetryPolicy retryPolicy = RetryUtil.getRetryPolicy(clonedOptions.retry()); + + return session.createProducer(linkName, entityPath, clonedOptions.retry().tryTimeout(), retryPolicy) .cast(AmqpSendLink.class); }); @@ -254,18 +257,24 @@ public EventHubConsumer createConsumer(String consumerGroup, String partitionId, clonedOptions.scheduler(connectionOptions.scheduler()); } if (clonedOptions.retry() == null) { - clonedOptions.retry(connectionOptions.retryPolicy()); + clonedOptions.retry(connectionOptions.retry()); } final String linkName = StringUtil.getRandomString("PR"); - final String entityPath = String.format(Locale.US, RECEIVER_ENTITY_PATH_FORMAT, eventHubPath, consumerGroup, partitionId); + final String entityPath = + String.format(Locale.US, RECEIVER_ENTITY_PATH_FORMAT, eventHubPath, consumerGroup, partitionId); final Mono receiveLinkMono = connectionMono.flatMap(connection -> { return connection.createSession(entityPath).cast(EventHubSession.class); }).flatMap(session -> { - logger.info("Creating consumer."); - return session.createConsumer(linkName, entityPath, getExpression(eventPosition), connectionOptions.timeout(), - clonedOptions.retry(), options.ownerLevel(), options.identifier()).cast(AmqpReceiveLink.class); + logger.verbose("Creating consumer for path: {}", entityPath); + + logger.verbose("Creating producer for {}", entityPath); + final RetryPolicy retryPolicy = RetryUtil.getRetryPolicy(clonedOptions.retry()); + + return session.createConsumer(linkName, entityPath, getExpression(eventPosition), + clonedOptions.retry().tryTimeout(), retryPolicy, options.ownerLevel(), options.identifier()) + .cast(AmqpReceiveLink.class); }); return new EventHubConsumer(receiveLinkMono, clonedOptions); @@ -279,7 +288,7 @@ public EventHubConsumer createConsumer(String consumerGroup, String partitionId, public void close() { if (hasConnection.getAndSet(false)) { try { - final AmqpConnection connection = connectionMono.block(connectionOptions.timeout()); + final AmqpConnection connection = connectionMono.block(connectionOptions.retry().tryTimeout()); if (connection != null) { connection.close(); } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java index a7152dd1e525..1491a5f9400e 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubClientBuilder.java @@ -3,7 +3,7 @@ package com.azure.messaging.eventhubs; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import com.azure.core.amqp.TransportType; import com.azure.core.credentials.TokenCredential; import com.azure.core.exception.AzureException; @@ -27,7 +27,6 @@ import java.net.Proxy; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; -import java.time.Duration; import java.util.Locale; import java.util.Objects; @@ -49,7 +48,7 @@ * * {@codesnippet com.azure.messaging.eventhubs.eventhubclientbuilder.connectionstring#string} * - *

Creating an {@link EventHubAsyncClient} using Event Hub with no {@link Retry}, different timeout and new + *

Creating an {@link EventHubAsyncClient} using Event Hub with no retry, different timeout and new * Scheduler

* * {@codesnippet com.azure.messaging.eventhubs.eventhubclientbuilder.retry-timeout-scheduler} @@ -60,12 +59,13 @@ public class EventHubClientBuilder { private static final String AZURE_EVENT_HUBS_CONNECTION_STRING = "AZURE_EVENT_HUBS_CONNECTION_STRING"; + private static final RetryOptions DEFAULT_RETRY = new RetryOptions() + .tryTimeout(ClientConstants.OPERATION_TIMEOUT); private TokenCredential credentials; private Configuration configuration; - private Duration timeout; private ProxyConfiguration proxyConfiguration; - private Retry retry; + private RetryOptions retryOptions; private Scheduler scheduler; private TransportType transport; private String host; @@ -155,8 +155,8 @@ public EventHubClientBuilder connectionString(String connectionString, String ev /** * Sets the configuration store that is used during construction of the service client. * - * If not specified, the default configuration store is used to configure the {@link EventHubAsyncClient}. Use {@link - * Configuration#NONE} to bypass using configuration settings during construction. + * If not specified, the default configuration store is used to configure the {@link EventHubAsyncClient}. Use + * {@link Configuration#NONE} to bypass using configuration settings during construction. * * @param configuration The configuration store used to configure the {@link EventHubAsyncClient}. * @return The updated {@link EventHubClientBuilder} object. @@ -232,31 +232,19 @@ public EventHubClientBuilder transportType(TransportType transport) { } /** - * Sets the default operation timeout for operations performed using {@link EventHubAsyncClient} and {@link - * EventHubConsumer} such as starting the communication link with the service and sending messages. + * Sets the retry policy for {@link EventHubAsyncClient}. If not specified, the default retry options are used. * - * @param timeout Duration for operation timeout. + * @param retryOptions The retry policy to use. * @return The updated {@link EventHubClientBuilder} object. */ - public EventHubClientBuilder timeout(Duration timeout) { - this.timeout = timeout; + public EventHubClientBuilder retry(RetryOptions retryOptions) { + this.retryOptions = retryOptions; return this; } /** - * Sets the retry policy for {@link EventHubAsyncClient}. If not specified, {@link Retry#getDefaultRetry()} is used. - * - * @param retry The retry policy to use. - * @return The updated {@link EventHubClientBuilder} object. - */ - public EventHubClientBuilder retry(Retry retry) { - this.retry = retry; - return this; - } - - /** - * Creates a new {@link EventHubAsyncClient} based on options set on this builder. Every time {@code buildAsyncClient()} - * is invoked, a new instance of {@link EventHubAsyncClient} is created. + * Creates a new {@link EventHubAsyncClient} based on options set on this builder. Every time {@code + * buildAsyncClient()} is invoked, a new instance of {@link EventHubAsyncClient} is created. * *

* The following options are used if ones are not specified in the builder: @@ -266,7 +254,7 @@ public EventHubClientBuilder retry(Retry retry) { * is used to provide any shared configuration values. The configuration values read are the {@link * BaseConfigurations#HTTP_PROXY}, {@link ProxyConfiguration#PROXY_USERNAME}, and {@link * ProxyConfiguration#PROXY_PASSWORD}. - *

  • If no retry is specified, {@link Retry#getDefaultRetry() the default retry} is used.
  • + *
  • If no retry is specified, the default retry options are used.
  • *
  • If no proxy is specified, the builder checks the {@link ConfigurationManager#getConfiguration() global * configuration} for a configured proxy, then it checks to see if a system proxy is configured.
  • *
  • If no timeout is specified, a {@link ClientConstants#OPERATION_TIMEOUT timeout of one minute} is used.
  • @@ -293,12 +281,8 @@ public EventHubAsyncClient buildAsyncClient() { connectionString(connectionString); } - if (timeout == null) { - timeout = ClientConstants.OPERATION_TIMEOUT; - } - - if (retry == null) { - retry = Retry.getDefaultRetry(); + if (retryOptions == null) { + retryOptions = DEFAULT_RETRY; } // If the proxy has been configured by the user but they have overridden the TransportType with something that @@ -321,8 +305,8 @@ public EventHubAsyncClient buildAsyncClient() { final CBSAuthorizationType authorizationType = credentials instanceof EventHubSharedAccessKeyCredential ? CBSAuthorizationType.SHARED_ACCESS_SIGNATURE : CBSAuthorizationType.JSON_WEB_TOKEN; - final ConnectionOptions parameters = new ConnectionOptions(host, eventHubPath, credentials, - authorizationType, timeout, transport, retry, proxyConfiguration, scheduler); + final ConnectionOptions parameters = new ConnectionOptions(host, eventHubPath, credentials, authorizationType, + transport, retryOptions, proxyConfiguration, scheduler); return new EventHubAsyncClient(parameters, provider, handlerProvider); } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java index af90f6076f6f..f19e07ed500b 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumer.java @@ -83,6 +83,8 @@ public class EventHubConsumer implements Closeable { link.getErrors().subscribe(error -> { logger.info("Error received in ReceiveLink. {}", error.toString()); + + //TODO (conniey): Surface error to EmitterProcessor. }); link.getShutdownSignals().subscribe(signal -> { diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java index f83585b6058c..fc4013609f31 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducer.java @@ -11,6 +11,7 @@ import com.azure.messaging.eventhubs.implementation.AmqpSendLink; import com.azure.messaging.eventhubs.implementation.ErrorContextProvider; import com.azure.messaging.eventhubs.implementation.EventDataUtil; +import com.azure.messaging.eventhubs.models.BatchOptions; import com.azure.messaging.eventhubs.models.EventHubProducerOptions; import com.azure.messaging.eventhubs.models.SendOptions; import org.apache.qpid.proton.message.Message; @@ -338,7 +339,7 @@ private void verifyPartitionKey(String partitionKey) { @Override public void close() throws IOException { if (!isDisposed.getAndSet(true)) { - final AmqpSendLink block = sendLinkMono.block(senderOptions.timeout()); + final AmqpSendLink block = sendLinkMono.block(senderOptions.retry().tryTimeout()); if (block != null) { block.close(); } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/CBSChannel.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/CBSChannel.java index 4ff385121c40..6f1059879317 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/CBSChannel.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/CBSChannel.java @@ -5,6 +5,7 @@ import com.azure.core.amqp.AmqpConnection; import com.azure.core.amqp.CBSNode; +import com.azure.core.amqp.RetryOptions; import com.azure.core.credentials.TokenCredential; import com.azure.core.util.logging.ClientLogger; import org.apache.qpid.proton.Proton; @@ -13,7 +14,6 @@ import org.apache.qpid.proton.message.Message; import reactor.core.publisher.Mono; -import java.time.Duration; import java.time.OffsetDateTime; import java.util.HashMap; import java.util.Locale; @@ -35,29 +35,29 @@ class CBSChannel extends EndpointStateNotifierBase implements CBSNode { private final TokenCredential credential; private final Mono cbsChannelMono; private final ReactorProvider provider; - private final Duration operationTimeout; private final CBSAuthorizationType authorizationType; + private final RetryOptions retryOptions; CBSChannel(AmqpConnection connection, TokenCredential tokenCredential, CBSAuthorizationType authorizationType, - ReactorProvider provider, ReactorHandlerProvider handlerProvider, Duration operationTimeout) { + ReactorProvider provider, ReactorHandlerProvider handlerProvider, RetryOptions retryOptions) { super(new ClientLogger(CBSChannel.class)); Objects.requireNonNull(connection); Objects.requireNonNull(tokenCredential); Objects.requireNonNull(authorizationType); Objects.requireNonNull(provider); - Objects.requireNonNull(operationTimeout); Objects.requireNonNull(handlerProvider); + Objects.requireNonNull(retryOptions); this.authorizationType = authorizationType; - this.operationTimeout = operationTimeout; + this.retryOptions = retryOptions; this.connection = connection; this.credential = tokenCredential; this.provider = provider; this.cbsChannelMono = connection.createSession(SESSION_NAME) .cast(ReactorSession.class) .map(session -> new RequestResponseChannel(connection.getIdentifier(), connection.getHost(), LINK_NAME, - CBS_ADDRESS, session.session(), handlerProvider)) + CBS_ADDRESS, session.session(), this.retryOptions, handlerProvider)) .cache(); } @@ -81,7 +81,7 @@ public Mono authorize(final String tokenAudience) { @Override public void close() { - final RequestResponseChannel channel = cbsChannelMono.block(operationTimeout); + final RequestResponseChannel channel = cbsChannelMono.block(retryOptions.tryTimeout()); if (channel != null) { channel.close(); } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ConnectionOptions.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ConnectionOptions.java index 30fb02506097..7a4c59a343cc 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ConnectionOptions.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ConnectionOptions.java @@ -3,23 +3,21 @@ package com.azure.messaging.eventhubs.implementation; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import com.azure.core.amqp.TransportType; import com.azure.core.credentials.TokenCredential; import com.azure.messaging.eventhubs.models.ProxyConfiguration; import reactor.core.scheduler.Scheduler; -import java.time.Duration; import java.util.Objects; /** * A wrapper class that contains all parameters that are needed to establish a connection to an Event Hub. */ public class ConnectionOptions { - private final Duration timeout; private final TokenCredential tokenCredential; private final TransportType transport; - private final Retry retryPolicy; + private final RetryOptions retryOptions; private final ProxyConfiguration proxyConfiguration; private final Scheduler scheduler; private final String host; @@ -27,24 +25,22 @@ public class ConnectionOptions { private final CBSAuthorizationType authorizationType; public ConnectionOptions(String host, String eventHubPath, TokenCredential tokenCredential, - CBSAuthorizationType authorizationType, Duration timeout, TransportType transport, - Retry retryPolicy, ProxyConfiguration proxyConfiguration, Scheduler scheduler) { + CBSAuthorizationType authorizationType, TransportType transport, RetryOptions retryOptions, + ProxyConfiguration proxyConfiguration, Scheduler scheduler) { Objects.requireNonNull(host); Objects.requireNonNull(eventHubPath); - Objects.requireNonNull(timeout); Objects.requireNonNull(tokenCredential); Objects.requireNonNull(transport); - Objects.requireNonNull(retryPolicy); + Objects.requireNonNull(retryOptions); Objects.requireNonNull(proxyConfiguration); Objects.requireNonNull(scheduler); this.host = host; this.eventHubPath = eventHubPath; - this.timeout = timeout; this.tokenCredential = tokenCredential; this.authorizationType = authorizationType; this.transport = transport; - this.retryPolicy = retryPolicy; + this.retryOptions = retryOptions; this.proxyConfiguration = proxyConfiguration; this.scheduler = scheduler; } @@ -57,10 +53,6 @@ public String eventHubPath() { return eventHubPath; } - public Duration timeout() { - return timeout; - } - public TokenCredential tokenCredential() { return tokenCredential; } @@ -73,8 +65,8 @@ public TransportType transportType() { return transport; } - public Retry retryPolicy() { - return retryPolicy; + public RetryOptions retry() { + return retryOptions; } public ProxyConfiguration proxyConfiguration() { diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubSession.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubSession.java index 8b1760894cb0..ee73afd6e050 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubSession.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/EventHubSession.java @@ -5,7 +5,7 @@ import com.azure.core.amqp.AmqpLink; import com.azure.core.amqp.AmqpSession; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryPolicy; import com.azure.messaging.eventhubs.EventHubConsumer; import reactor.core.publisher.Mono; @@ -35,5 +35,5 @@ public interface EventHubSession extends AmqpSession { * @return A newly created AMQP link. */ Mono createConsumer(String linkName, String entityPath, String eventPositionExpression, Duration timeout, - Retry retry, Long ownerLevel, String consumerIdentifier); + RetryPolicy retry, Long ownerLevel, String consumerIdentifier); } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java index efd2d8f678c0..3dcc0b12ea04 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ManagementChannel.java @@ -4,6 +4,7 @@ package com.azure.messaging.eventhubs.implementation; import com.azure.core.amqp.AmqpConnection; +import com.azure.core.amqp.RetryOptions; import com.azure.core.credentials.TokenCredential; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.EventHubProperties; @@ -66,7 +67,7 @@ public class ManagementChannel extends EndpointStateNotifierBase implements Even * @param provider The dispatcher to execute work on Reactor. */ ManagementChannel(AmqpConnection connection, String eventHubPath, TokenCredential tokenProvider, - TokenResourceProvider audienceProvider, ReactorProvider provider, + TokenResourceProvider audienceProvider, ReactorProvider provider, RetryOptions retryOptions, ReactorHandlerProvider handlerProvider, AmqpResponseMapper mapper) { super(new ClientLogger(ManagementChannel.class)); @@ -77,6 +78,7 @@ public class ManagementChannel extends EndpointStateNotifierBase implements Even Objects.requireNonNull(provider); Objects.requireNonNull(handlerProvider); Objects.requireNonNull(mapper); + Objects.requireNonNull(retryOptions); this.audienceProvider = audienceProvider; this.connection = connection; @@ -87,7 +89,7 @@ public class ManagementChannel extends EndpointStateNotifierBase implements Even this.channelMono = connection.createSession(SESSION_NAME) .cast(ReactorSession.class) .map(session -> new RequestResponseChannel(connection.getIdentifier(), connection.getHost(), LINK_NAME, - ADDRESS, session.session(), handlerProvider)) + ADDRESS, session.session(), retryOptions, handlerProvider)) .cache(); } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorConnection.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorConnection.java index 0c8434e2516a..d54f6e6e6a71 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorConnection.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorConnection.java @@ -7,6 +7,8 @@ import com.azure.core.amqp.AmqpExceptionHandler; import com.azure.core.amqp.AmqpSession; import com.azure.core.amqp.CBSNode; +import com.azure.core.amqp.RetryPolicy; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.handler.ConnectionHandler; import com.azure.messaging.eventhubs.implementation.handler.SessionHandler; @@ -38,6 +40,7 @@ public class ReactorConnection extends EndpointStateNotifierBase implements Even private final Disposable.Composite subscriptions; private final Mono managementChannelMono; private final TokenResourceProvider tokenResourceProvider; + private final RetryPolicy retryPolicy; private ReactorExecutor executor; //TODO (conniey): handle failures and recreating the Reactor. Resubscribing the handlers, etc. @@ -54,15 +57,17 @@ public class ReactorConnection extends EndpointStateNotifierBase implements Even * @param reactorProvider Provides proton-j Reactor instances. * @param handlerProvider Provides {@link BaseHandler} to listen to proton-j reactor events. */ - public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, - ReactorProvider reactorProvider, ReactorHandlerProvider handlerProvider, AmqpResponseMapper mapper) { + public ReactorConnection(String connectionId, ConnectionOptions connectionOptions, ReactorProvider reactorProvider, + ReactorHandlerProvider handlerProvider, AmqpResponseMapper mapper) { super(new ClientLogger(ReactorConnection.class)); this.connectionOptions = connectionOptions; this.reactorProvider = reactorProvider; this.connectionId = connectionId; this.handlerProvider = handlerProvider; - this.handler = handlerProvider.createConnectionHandler(connectionId, connectionOptions.host(), connectionOptions.transportType()); + this.handler = handlerProvider.createConnectionHandler(connectionId, connectionOptions.host(), + connectionOptions.transportType()); + this.retryPolicy = RetryUtil.getRetryPolicy(connectionOptions.retry()); this.connectionMono = Mono.fromCallable(() -> getOrCreateConnection()) .doOnSubscribe(c -> hasConnection.set(true)); @@ -82,7 +87,7 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption this.managementChannelMono = connectionMono.then( Mono.fromCallable(() -> (EventHubManagementNode) new ManagementChannel(this, connectionOptions.eventHubPath(), connectionOptions.tokenCredential(), tokenResourceProvider, - reactorProvider, handlerProvider, mapper))).cache(); + reactorProvider, connectionOptions.retry(), handlerProvider, mapper))).cache(); } /** @@ -90,8 +95,9 @@ public ReactorConnection(String connectionId, ConnectionOptions connectionOption */ @Override public Mono getCBSNode() { - final Mono cbsNodeMono = getConnectionStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE) - .timeout(connectionOptions.timeout()) + final Mono cbsNodeMono = RetryUtil.withRetry( + getConnectionStates().takeUntil(x -> x == AmqpEndpointState.ACTIVE), + connectionOptions.retry().tryTimeout(), retryPolicy) .then(Mono.fromCallable(() -> getOrCreateCBSNode())); return hasConnection.get() @@ -144,13 +150,13 @@ public Mono createSession(String sessionName) { } return connectionMono.map(connection -> sessionMap.computeIfAbsent(sessionName, key -> { - final SessionHandler handler = - handlerProvider.createSessionHandler(connectionId, getHost(), sessionName, connectionOptions.timeout()); + final SessionHandler handler = handlerProvider.createSessionHandler(connectionId, getHost(), sessionName, + connectionOptions.retry().tryTimeout()); final Session session = connection.session(); BaseHandler.setHandler(session, handler); - return new ReactorSession(session, handler, sessionName, reactorProvider, handlerProvider, - this.getCBSNode(), tokenResourceProvider, connectionOptions.timeout()); + return new ReactorSession(session, handler, sessionName, reactorProvider, handlerProvider, getCBSNode(), + tokenResourceProvider, connectionOptions.retry().tryTimeout()); })); } @@ -187,8 +193,7 @@ private synchronized CBSNode getOrCreateCBSNode() { logger.info("Setting CBS channel."); cbsChannel = new CBSChannel(this, connectionOptions.tokenCredential(), - connectionOptions.authorizationType(), reactorProvider, handlerProvider, - connectionOptions.timeout()); + connectionOptions.authorizationType(), reactorProvider, handlerProvider, connectionOptions.retry()); } return cbsChannel; @@ -203,7 +208,7 @@ private synchronized Connection getOrCreateConnection() throws IOException { reactorExceptionHandler = new ReactorExceptionHandler(); executor = new ReactorExecutor(reactor, connectionOptions.scheduler(), connectionId, - reactorExceptionHandler, connectionOptions.timeout(), connectionOptions.host()); + reactorExceptionHandler, connectionOptions.retry().tryTimeout(), connectionOptions.host()); executor.start(); } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorSender.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorSender.java index 865d0a8565ca..77c40942eb88 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorSender.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorSender.java @@ -3,12 +3,13 @@ package com.azure.messaging.eventhubs.implementation; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryPolicy; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.ErrorCondition; import com.azure.core.amqp.exception.ErrorContext; import com.azure.core.amqp.exception.ExceptionUtil; import com.azure.core.amqp.exception.OperationCancelledException; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.handler.SendLinkHandler; import org.apache.qpid.proton.Proton; @@ -44,6 +45,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static com.azure.messaging.eventhubs.implementation.EventDataUtil.getDataSerializedSize; import static java.nio.charset.StandardCharsets.UTF_8; @@ -60,17 +62,19 @@ class ReactorSender extends EndpointStateNotifierBase implements AmqpSendLink { private final AtomicBoolean hasConnected = new AtomicBoolean(); private final AtomicBoolean hasAuthorized = new AtomicBoolean(true); + private final AtomicInteger retryAttempts = new AtomicInteger(); private final Object pendingSendLock = new Object(); private final ConcurrentHashMap pendingSendsMap = new ConcurrentHashMap<>(); private final PriorityQueue pendingSendsQueue = new PriorityQueue<>(1000, new DeliveryTagComparator()); private final ActiveClientTokenManager tokenManager; - private final Retry retry; + private final RetryPolicy retry; private final Duration timeout; private final Timer sendTimeoutTimer = new Timer("SendTimeout-timer"); private final Object errorConditionLock = new Object(); + private volatile Exception lastKnownLinkError; private volatile Instant lastKnownErrorReportedAt; @@ -81,7 +85,7 @@ class ReactorSender extends EndpointStateNotifierBase implements AmqpSendLink { private volatile int maxMessageSize; ReactorSender(String entityPath, Sender sender, SendLinkHandler handler, ReactorProvider reactorProvider, - ActiveClientTokenManager tokenManager, Duration timeout, Retry retry, int maxMessageSize) { + ActiveClientTokenManager tokenManager, Duration timeout, RetryPolicy retry, int maxMessageSize) { super(new ClientLogger(ReactorSender.class)); this.entityPath = entityPath; this.sender = sender; @@ -210,9 +214,9 @@ public Mono getLinkSize() { return Mono.just(maxMessageSize); } - return handler.getEndpointStates() - .takeUntil(state -> state == EndpointState.ACTIVE) - .timeout(timeout) + return RetryUtil.withRetry( + handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), + timeout, retry) .then(Mono.fromCallable(() -> { final UnsignedLong remoteMaxMessageSize = sender.getRemoteMaxMessageSize(); @@ -239,8 +243,9 @@ private Mono send(byte[] bytes, int arrayOffset, int messageFormat) { if (hasConnected.get()) { return sendWorkItem; } else { - return handler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE) - .timeout(timeout) + return RetryUtil.withRetry( + handler.getEndpointStates().takeUntil(state -> state == EndpointState.ACTIVE), + timeout, retry) .then(sendWorkItem); } } @@ -348,8 +353,8 @@ private void processDeliveredMessage(Delivery delivery) { if (outcome instanceof Accepted) { synchronized (errorConditionLock) { - this.lastKnownLinkError = null; - this.retry.resetRetryInterval(); + lastKnownLinkError = null; + retryAttempts.set(0); } workItem.sink().success(); @@ -359,24 +364,27 @@ private void processDeliveredMessage(Delivery delivery) { final Exception exception = ExceptionUtil.toException(error.getCondition().toString(), error.getDescription(), handler.getErrorContext(sender)); + final int retryAttempt; if (isGeneralSendError(error.getCondition())) { synchronized (errorConditionLock) { - this.lastKnownLinkError = exception; - this.retry.incrementRetryCount(); + lastKnownLinkError = exception; + retryAttempt = retryAttempts.incrementAndGet(); } + } else { + retryAttempt = retryAttempts.get(); } - final Duration retryInterval = retry.getNextRetryInterval(exception, workItem.timeoutTracker().remaining()); + final Duration retryInterval = retry.calculateRetryDelay(exception, retryAttempt); - if (retryInterval == null) { - this.cleanupFailedSend(workItem, exception); + if (retryInterval.compareTo(workItem.timeoutTracker().remaining()) > 0) { + cleanupFailedSend(workItem, exception); } else { workItem.lastKnownException(exception); try { reactorProvider.getReactorDispatcher().invoke(() -> send(workItem), retryInterval); } catch (IOException | RejectedExecutionException schedulerException) { exception.initCause(schedulerException); - this.cleanupFailedSend( + cleanupFailedSend( workItem, new AmqpException(false, String.format(Locale.US, "Entity(%s): send operation failed while scheduling a" @@ -385,10 +393,10 @@ private void processDeliveredMessage(Delivery delivery) { } } } else if (outcome instanceof Released) { - this.cleanupFailedSend(workItem, new OperationCancelledException(outcome.toString(), + cleanupFailedSend(workItem, new OperationCancelledException(outcome.toString(), handler.getErrorContext(sender))); } else { - this.cleanupFailedSend(workItem, new AmqpException(false, outcome.toString(), + cleanupFailedSend(workItem, new AmqpException(false, outcome.toString(), handler.getErrorContext(sender))); } } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorSession.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorSession.java index 66ee731b2062..afc7db936efe 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorSession.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/ReactorSession.java @@ -6,7 +6,8 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpLink; import com.azure.core.amqp.CBSNode; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryPolicy; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.implementation.util.ImplUtils; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.EventHubProducer; @@ -116,11 +117,12 @@ public Duration getOperationTimeout() { } @Override - public Mono createProducer(String linkName, String entityPath, Duration timeout, Retry retry) { + public Mono createProducer(String linkName, String entityPath, Duration timeout, RetryPolicy retry) { final ActiveClientTokenManager tokenManager = createTokenManager(entityPath); - return getConnectionStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE) - .timeout(timeout) + return RetryUtil.withRetry( + getConnectionStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), + timeout, retry) .then(tokenManager.authorize().then(Mono.create(sink -> { final AmqpSendLink existingSender = openSendLinks.get(linkName); if (existingSender != null) { @@ -156,17 +158,17 @@ public Mono createProducer(String linkName, String entityPath, Duratio } @Override - public Mono createConsumer(String linkName, String entityPath, Duration timeout, Retry retry) { + public Mono createConsumer(String linkName, String entityPath, Duration timeout, RetryPolicy retry) { return createConsumer(linkName, entityPath, "", timeout, retry, null, null); } @Override public Mono createConsumer(String linkName, String entityPath, String eventPositionExpression, - Duration timeout, Retry retry, Long ownerLevel, String consumerIdentifier) { + Duration timeout, RetryPolicy retry, Long ownerLevel, String consumerIdentifier) { final ActiveClientTokenManager tokenManager = createTokenManager(entityPath); - return getConnectionStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE) - .timeout(timeout) + return RetryUtil.withRetry( + getConnectionStates().takeUntil(state -> state == AmqpEndpointState.ACTIVE), timeout, retry) .then(tokenManager.authorize().then(Mono.create(sink -> { final AmqpReceiveLink existingReceiver = openReceiveLinks.get(linkName); if (existingReceiver != null) { diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/RequestResponseChannel.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/RequestResponseChannel.java index 9295cc885f5e..4f3c5e0f70f5 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/RequestResponseChannel.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/RequestResponseChannel.java @@ -3,9 +3,12 @@ package com.azure.messaging.eventhubs.implementation; +import com.azure.core.amqp.RetryOptions; +import com.azure.core.amqp.RetryPolicy; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.AmqpResponseCode; import com.azure.core.amqp.exception.ExceptionUtil; +import com.azure.core.amqp.implementation.RetryUtil; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.handler.ReceiveLinkHandler; import com.azure.messaging.eventhubs.implementation.handler.SendLinkHandler; @@ -28,6 +31,7 @@ import java.io.Closeable; import java.io.IOException; +import java.time.Duration; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,14 +49,19 @@ class RequestResponseChannel implements Closeable { private final Sender sendLink; private final Receiver receiveLink; private final String replyTo; + private final Duration operationTimeout; private final AtomicBoolean hasOpened = new AtomicBoolean(); private final AtomicLong requestId = new AtomicLong(0); private final SendLinkHandler sendLinkHandler; private final ReceiveLinkHandler receiveLinkHandler; private final Disposable subscription; + private final RetryPolicy retryPolicy; RequestResponseChannel(String connectionId, String host, String linkName, String path, Session session, - ReactorHandlerProvider handlerProvider) { + RetryOptions retryOptions, ReactorHandlerProvider handlerProvider) { + this.operationTimeout = retryOptions.tryTimeout(); + this.retryPolicy = RetryUtil.getRetryPolicy(retryOptions); + this.replyTo = path.replace("$", "") + "-client-reply-to"; this.sendLink = session.sender(linkName + ":sender"); final Target target = new Target(); @@ -108,22 +117,23 @@ Mono sendWithAck(final Message message, final ReactorDispatcher dispatc message.setMessageId(messageId); message.setReplyTo(replyTo); - //TODO (conniey): timeout here if we can't get the link handlers to pass an "Active" state. - return Mono.when( - sendLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE), - receiveLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE)).then( - Mono.create(sink -> { - try { - logger.verbose("Scheduling on dispatcher. Message Id {}", messageId); - unconfirmedSends.putIfAbsent(messageId, sink); - - dispatcher.invoke(() -> { - send(message); - }); - } catch (IOException e) { - sink.error(e); - } - })); + return RetryUtil.withRetry( + Mono.when(sendLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE), + receiveLinkHandler.getEndpointStates().takeUntil(x -> x == EndpointState.ACTIVE)), + operationTimeout, retryPolicy) + .then( + Mono.create(sink -> { + try { + logger.verbose("Scheduling on dispatcher. Message Id {}", messageId); + unconfirmedSends.putIfAbsent(messageId, sink); + + dispatcher.invoke(() -> { + send(message); + }); + } catch (IOException e) { + sink.error(e); + } + })); } private void start() { diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/BatchOptions.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/BatchOptions.java similarity index 82% rename from sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/BatchOptions.java rename to sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/BatchOptions.java index 622053df57d9..c9c3d7a41137 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/BatchOptions.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/BatchOptions.java @@ -1,7 +1,10 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.messaging.eventhubs; +package com.azure.messaging.eventhubs.models; + +import com.azure.messaging.eventhubs.EventDataBatch; +import com.azure.messaging.eventhubs.EventHubProducer; /** * The set of options that can be specified when creating an {@link EventDataBatch}. @@ -9,7 +12,7 @@ * @see EventHubProducer#createBatch() * @see EventHubProducer#createBatch(BatchOptions) */ -public class BatchOptions implements Cloneable { +public class BatchOptions { private int maximumSizeInBytes; private String partitionKey; @@ -60,18 +63,9 @@ public String partitionKey() { * * @return A shallow clone of this object. */ - @Override - public Object clone() { - BatchOptions clone; - try { - clone = (BatchOptions) super.clone(); - } catch (CloneNotSupportedException e) { - clone = new BatchOptions(); - } - - clone.partitionKey(partitionKey); - clone.maximumSizeInBytes(maximumSizeInBytes); - - return clone; + public BatchOptions clone() { + return new BatchOptions() + .partitionKey(partitionKey) + .maximumSizeInBytes(maximumSizeInBytes); } } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventHubConsumerOptions.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventHubConsumerOptions.java index 3a751739c689..9fe84e3e57df 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventHubConsumerOptions.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventHubConsumerOptions.java @@ -3,10 +3,9 @@ package com.azure.messaging.eventhubs.models; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import com.azure.core.implementation.annotation.Fluent; import com.azure.core.implementation.util.ImplUtils; -import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.EventHubAsyncClient; import com.azure.messaging.eventhubs.EventHubConsumer; import reactor.core.scheduler.Scheduler; @@ -22,9 +21,7 @@ * @see EventHubAsyncClient#createConsumer(String, String, EventPosition, EventHubConsumerOptions) */ @Fluent -public class EventHubConsumerOptions implements Cloneable { - private final ClientLogger logger = new ClientLogger(EventHubConsumerOptions.class); - +public class EventHubConsumerOptions { /** * The maximum length, in characters, for the identifier assigned to an {@link EventHubConsumer}. */ @@ -43,7 +40,7 @@ public class EventHubConsumerOptions implements Cloneable { private String identifier; private Long ownerLevel; - private Retry retry; + private RetryOptions retry; private Scheduler scheduler; private int prefetchCount; @@ -106,7 +103,7 @@ public EventHubConsumerOptions ownerLevel(Long priority) { * @param retry The retry policy to use when receiving events. * @return The updated {@link EventHubConsumerOptions} object. */ - public EventHubConsumerOptions retry(Retry retry) { + public EventHubConsumerOptions retry(RetryOptions retry) { this.retry = retry; return this; } @@ -158,12 +155,12 @@ public String identifier() { } /** - * Gets the retry policy when receiving events. If not specified, the retry policy configured on the associated + * Gets the retry options when receiving events. If not specified, the retry options configured on the associated * {@link EventHubAsyncClient} is used. * - * @return The retry policy when receiving events. + * @return The retry options when receiving events. */ - public Retry retry() { + public RetryOptions retry() { return retry; } @@ -205,27 +202,16 @@ public int prefetchCount() { * @return A shallow clone of this object. */ public EventHubConsumerOptions clone() { - EventHubConsumerOptions clone; - try { - clone = (EventHubConsumerOptions) super.clone(); - } catch (CloneNotSupportedException e) { - clone = new EventHubConsumerOptions(); - } + final EventHubConsumerOptions clone = new EventHubConsumerOptions() + .scheduler(this.scheduler()) + .identifier(this.identifier()) + .prefetchCount(this.prefetchCount()) + .ownerLevel(this.ownerLevel()); if (retry != null) { - try { - clone.retry((Retry) retry.clone()); - } catch (CloneNotSupportedException e) { - logger.error("Unable to create clone of retry.", e); - clone.retry(retry); - } + clone.retry(retry.clone()); } - clone.scheduler(this.scheduler()); - clone.identifier(this.identifier()); - clone.prefetchCount(this.prefetchCount()); - clone.ownerLevel(this.ownerLevel()); - return clone; } } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventHubProducerOptions.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventHubProducerOptions.java index b23c1f581e1b..e7a970aff5b7 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventHubProducerOptions.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/EventHubProducerOptions.java @@ -3,14 +3,11 @@ package com.azure.messaging.eventhubs.models; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import com.azure.core.implementation.annotation.Fluent; -import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.EventHubAsyncClient; import com.azure.messaging.eventhubs.EventHubProducer; -import java.time.Duration; - /** * The set of options that can be specified when creating an {@link EventHubProducer} to configure its behavior. * @@ -18,12 +15,9 @@ * @see EventHubAsyncClient#createProducer(EventHubProducerOptions) */ @Fluent -public class EventHubProducerOptions implements Cloneable { - private final ClientLogger logger = new ClientLogger(EventHubProducerOptions.class); - +public class EventHubProducerOptions { private String partitionId; - private Retry retry; - private Duration timeout; + private RetryOptions retryOptions; /** * Sets the identifier of the Event Hub partition that the {@link EventHubProducer} will be bound to, limiting it to @@ -43,36 +37,24 @@ public EventHubProducerOptions partitionId(String partitionId) { } /** - * Sets the retry policy used to govern retry attempts when an issue is encountered while sending. + * Sets the retry options used to govern retry attempts when an issue is encountered while sending. * - * @param retry The retry policy used to govern retry attempts when an issue is encountered while sending. + * @param retry The retry options used to govern retry attempts when an issue is encountered while sending. * @return The updated SenderOptions object. */ - public EventHubProducerOptions retry(Retry retry) { - this.retry = retry; - return this; - } - - /** - * Sets the default timeout to apply when sending events. If the timeout is reached, before the Event Hub - * acknowledges receipt of the event data being sent, the attempt will be considered failed and will be retried. - * - * @param timeout The timeout to apply when sending events. - * @return The updated {@link EventHubProducerOptions} object. - */ - public EventHubProducerOptions timeout(Duration timeout) { - this.timeout = timeout; + public EventHubProducerOptions retry(RetryOptions retry) { + this.retryOptions = retry; return this; } /** - * Gets the retry policy used to govern retry attempts when an issue is encountered while sending. + * Gets the retry options used to govern retry attempts when an issue is encountered while sending. * - * @return the retry policy used to govern retry attempts when an issue is encountered while sending. If {@code - * null}, then the retry policy configured on the associated {@link EventHubAsyncClient} is used. + * @return the retry options used to govern retry attempts when an issue is encountered while sending. If {@code + * null}, then the retry options configured on the associated {@link EventHubAsyncClient} is used. */ - public Retry retry() { - return retry; + public RetryOptions retry() { + return retryOptions; } /** @@ -88,41 +70,19 @@ public String partitionId() { return partitionId; } - /** - * Gets the default timeout when sending events. - * - * @return The default timeout when sending events. - */ - public Duration timeout() { - return timeout; - } - /** * Creates a clone of this instance. * * @return A shallow clone of this object. */ - @Override - public Object clone() { - EventHubProducerOptions clone; - try { - clone = (EventHubProducerOptions) super.clone(); - } catch (CloneNotSupportedException e) { - clone = new EventHubProducerOptions(); - } + public EventHubProducerOptions clone() { + final EventHubProducerOptions clone = new EventHubProducerOptions() + .partitionId(partitionId); - if (retry != null) { - try { - clone.retry((Retry) retry.clone()); - } catch (CloneNotSupportedException e) { - logger.error("Unable to create clone of retry.", e); - clone.retry(retry); - } + if (retryOptions != null) { + clone.retry(retryOptions.clone()); } - clone.partitionId(partitionId); - clone.timeout(timeout); - return clone; } } diff --git a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendOptions.java b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendOptions.java index 4dc36cb2f74b..9e7f0dd63c1d 100644 --- a/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendOptions.java +++ b/sdk/eventhubs/azure-eventhubs/src/main/java/com/azure/messaging/eventhubs/models/SendOptions.java @@ -17,7 +17,7 @@ * @see EventHubProducer#send(Flux, SendOptions) */ @Fluent -public class SendOptions implements Cloneable { +public class SendOptions { private String partitionKey; /** @@ -56,17 +56,8 @@ public String partitionKey() { * * @return A shallow clone of this object. */ - @Override - public Object clone() { - SendOptions clone; - try { - clone = (SendOptions) super.clone(); - } catch (CloneNotSupportedException e) { - clone = new SendOptions(); - } - - clone.partitionKey(partitionKey); - - return clone; + public SendOptions clone() { + return new SendOptions() + .partitionKey(partitionKey); } } diff --git a/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubClientBuilderJavaDocCodeSamples.java b/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubClientBuilderJavaDocCodeSamples.java index e9c8a26a9d0f..44087f615719 100644 --- a/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubClientBuilderJavaDocCodeSamples.java +++ b/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubClientBuilderJavaDocCodeSamples.java @@ -3,7 +3,7 @@ package com.azure.messaging.eventhubs; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import reactor.core.scheduler.Schedulers; import java.time.Duration; @@ -54,10 +54,11 @@ public void instantiationRetry() { String connectionString = "Endpoint={endpoint};SharedAccessKeyName={sharedAccessKeyName};" + "SharedAccessKey={sharedAccessKey};EntityPath={eventHubPath}"; + RetryOptions retryOptions = new RetryOptions() + .tryTimeout(Duration.ofSeconds(30)); EventHubAsyncClient client = new EventHubClientBuilder() .connectionString(connectionString) - .retry(Retry.getNoRetry()) - .timeout(Duration.ofSeconds(30)) + .retry(retryOptions) .scheduler(Schedulers.newElastic("dedicated-event-hub-scheduler")) .buildAsyncClient(); // END: com.azure.messaging.eventhubs.eventhubclientbuilder.retry-timeout-scheduler diff --git a/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerJavaDocCodeSamples.java b/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerJavaDocCodeSamples.java index 1b68c477b21b..2b690c00b41f 100644 --- a/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerJavaDocCodeSamples.java +++ b/sdk/eventhubs/azure-eventhubs/src/samples/java/com/azure/messaging/eventhubs/EventHubProducerJavaDocCodeSamples.java @@ -3,6 +3,8 @@ package com.azure.messaging.eventhubs; +import com.azure.core.amqp.RetryOptions; +import com.azure.messaging.eventhubs.models.BatchOptions; import com.azure.messaging.eventhubs.models.EventHubProducerOptions; import com.azure.messaging.eventhubs.models.SendOptions; import reactor.core.publisher.Flux; @@ -43,9 +45,11 @@ public void instantiate() throws IOException { */ public void instantiatePartitionProducer() throws IOException { // BEGIN: com.azure.messaging.eventhubs.eventhubproducer.instantiatePartitionProducer + RetryOptions retryOptions = new RetryOptions() + .tryTimeout(Duration.ofSeconds(45)); EventHubProducerOptions options = new EventHubProducerOptions() .partitionId("foo") - .timeout(Duration.ofSeconds(45)); + .retry(retryOptions); EventHubProducer producer = client.createProducer(options); // END: com.azure.messaging.eventhubs.eventhubproducer.instantiatePartitionProducer diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientIntegrationTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientIntegrationTest.java index 81de57969447..4ed4cd14dc77 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientIntegrationTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientIntegrationTest.java @@ -23,6 +23,7 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; +import java.time.Instant; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -30,6 +31,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import static com.azure.messaging.eventhubs.EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME; import static com.azure.messaging.eventhubs.TestUtils.isMatchingEvent; @@ -49,6 +51,7 @@ public static Iterable getTransportTypes() { private static final String PARTITION_ID = "1"; private static final AtomicBoolean HAS_PUSHED_EVENTS = new AtomicBoolean(); + private static final AtomicReference MESSAGES_PUSHED_INSTANT = new AtomicReference<>(); private static final String MESSAGE_TRACKING_VALUE = UUID.randomUUID().toString(); private EventHubAsyncClient client; @@ -69,6 +72,8 @@ protected String testName() { @Override protected void beforeTest() { + skipIfNotRecordMode(); + final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider()); final ConnectionOptions connectionOptions = getConnectionOptions(); @@ -93,13 +98,11 @@ public void nullConstructor() throws NullPointerException { */ @Test public void receiveMessage() { - skipIfNotRecordMode(); - // Arrange final EventHubConsumerOptions options = new EventHubConsumerOptions() .prefetchCount(2); final EventHubConsumer consumer = client.createConsumer(DEFAULT_CONSUMER_GROUP_NAME, PARTITION_ID, - EventPosition.earliest(), options); + EventPosition.fromEnqueuedTime(MESSAGES_PUSHED_INSTANT.get()), options); // Act & Assert StepVerifier.create(consumer.receive().filter(x -> isMatchingEvent(x, MESSAGE_TRACKING_VALUE)).take(NUMBER_OF_EVENTS)) @@ -192,6 +195,7 @@ private void setupEventTestData(EventHubAsyncClient client) { final Flux events = TestUtils.getEvents(NUMBER_OF_EVENTS, MESSAGE_TRACKING_VALUE); try { + MESSAGES_PUSHED_INSTANT.set(Instant.now()); producer.send(events).block(TIMEOUT); } finally { dispose(producer); diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientMetadataIntegrationTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientMetadataIntegrationTest.java index 7758f5d9c0aa..04fb640aa215 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientMetadataIntegrationTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubClientMetadataIntegrationTest.java @@ -3,7 +3,6 @@ package com.azure.messaging.eventhubs; -import com.azure.core.amqp.Retry; import com.azure.core.amqp.TransportType; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.ErrorCondition; @@ -133,8 +132,8 @@ public void getPartitionPropertiesInvalidToken() throws InvalidKeyException, NoS final TokenCredential badTokenProvider = new EventHubSharedAccessKeyCredential( invalidCredentials.sharedAccessKeyName(), invalidCredentials.sharedAccessKey(), TIMEOUT); final ConnectionOptions connectionOptions = new ConnectionOptions(original.endpoint().getHost(), - original.eventHubPath(), badTokenProvider, getAuthorizationType(), TIMEOUT, - TransportType.AMQP, Retry.getNoRetry(), ProxyConfiguration.SYSTEM_DEFAULTS, getConnectionOptions().scheduler()); + original.eventHubPath(), badTokenProvider, getAuthorizationType(), TransportType.AMQP, RETRY_OPTIONS, + ProxyConfiguration.SYSTEM_DEFAULTS, getConnectionOptions().scheduler()); final EventHubAsyncClient client = new EventHubAsyncClient(connectionOptions, getReactorProvider(), handlerProvider); // Act & Assert @@ -158,8 +157,8 @@ public void getPartitionPropertiesNonExistentHub() { // Arrange final ConnectionStringProperties original = getConnectionStringProperties(); final ConnectionOptions connectionOptions = new ConnectionOptions(original.endpoint().getHost(), - "invalid-event-hub", getTokenCredential(), getAuthorizationType(), TIMEOUT, - TransportType.AMQP, Retry.getNoRetry(), ProxyConfiguration.SYSTEM_DEFAULTS, getConnectionOptions().scheduler()); + "invalid-event-hub", getTokenCredential(), getAuthorizationType(), TransportType.AMQP, + RETRY_OPTIONS, ProxyConfiguration.SYSTEM_DEFAULTS, getConnectionOptions().scheduler()); final EventHubAsyncClient client = new EventHubAsyncClient(connectionOptions, getReactorProvider(), handlerProvider); // Act & Assert diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerIntegrationTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerIntegrationTest.java index 45919dc24657..54c2b1f2605a 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerIntegrationTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerIntegrationTest.java @@ -3,7 +3,6 @@ package com.azure.messaging.eventhubs; -import com.azure.core.amqp.Retry; import com.azure.core.amqp.TransportType; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.util.logging.ClientLogger; @@ -74,8 +73,8 @@ protected void beforeTest() { final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider()); final ConnectionStringProperties properties = new ConnectionStringProperties(getConnectionString()); final ConnectionOptions connectionOptions = new ConnectionOptions(properties.endpoint().getHost(), - properties.eventHubPath(), getTokenCredential(), getAuthorizationType(), TIMEOUT, TransportType.AMQP, - Retry.getNoRetry(), ProxyConfiguration.SYSTEM_DEFAULTS, Schedulers.newElastic("test-pool")); + properties.eventHubPath(), getTokenCredential(), getAuthorizationType(), TransportType.AMQP, + RETRY_OPTIONS, ProxyConfiguration.SYSTEM_DEFAULTS, Schedulers.parallel()); client = new EventHubAsyncClient(connectionOptions, getReactorProvider(), handlerProvider); } diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerTest.java index b2190471f9db..66eeada7b47a 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubConsumerTest.java @@ -5,7 +5,7 @@ import com.azure.core.amqp.AmqpEndpointState; import com.azure.core.amqp.AmqpShutdownSignal; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.AmqpReceiveLink; import com.azure.messaging.eventhubs.models.EventHubConsumerOptions; @@ -87,7 +87,7 @@ public void setup() { options = new EventHubConsumerOptions() .identifier("an-identifier") .prefetchCount(PREFETCH) - .retry(Retry.getNoRetry()) + .retry(new RetryOptions()) .scheduler(Schedulers.elastic()); consumer = new EventHubConsumer(receiveLinkMono, options); } diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java index 8542a7983ff9..97ba60c95f94 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerIntegrationTest.java @@ -3,13 +3,13 @@ package com.azure.messaging.eventhubs; -import com.azure.core.amqp.Retry; import com.azure.core.amqp.TransportType; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.ApiTestBase; import com.azure.messaging.eventhubs.implementation.ConnectionOptions; import com.azure.messaging.eventhubs.implementation.ConnectionStringProperties; import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider; +import com.azure.messaging.eventhubs.models.BatchOptions; import com.azure.messaging.eventhubs.models.EventHubProducerOptions; import com.azure.messaging.eventhubs.models.ProxyConfiguration; import org.junit.Assert; @@ -48,8 +48,8 @@ protected void beforeTest() { final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider()); final ConnectionStringProperties properties = new ConnectionStringProperties(getConnectionString()); final ConnectionOptions connectionOptions = new ConnectionOptions(properties.endpoint().getHost(), - properties.eventHubPath(), getTokenCredential(), getAuthorizationType(), TIMEOUT, TransportType.AMQP, - Retry.getNoRetry(), ProxyConfiguration.SYSTEM_DEFAULTS, Schedulers.newSingle("single-threaded")); + properties.eventHubPath(), getTokenCredential(), getAuthorizationType(), TransportType.AMQP, RETRY_OPTIONS, + ProxyConfiguration.SYSTEM_DEFAULTS, Schedulers.parallel()); client = new EventHubAsyncClient(connectionOptions, getReactorProvider(), handlerProvider); } diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerTest.java index f7c545aacfe1..536c16489b1f 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerTest.java @@ -3,10 +3,11 @@ package com.azure.messaging.eventhubs; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.ErrorCondition; import com.azure.messaging.eventhubs.implementation.AmqpSendLink; +import com.azure.messaging.eventhubs.models.BatchOptions; import com.azure.messaging.eventhubs.models.EventHubProducerOptions; import com.azure.messaging.eventhubs.models.SendOptions; import org.apache.qpid.proton.amqp.messaging.Section; @@ -76,7 +77,8 @@ public void sendMultipleMessages() { when(sendLink.send(anyList())).thenReturn(Mono.empty()); final SendOptions options = new SendOptions(); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions() + .retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(sendLink), producerOptions); // Act @@ -103,7 +105,8 @@ public void sendSingleMessage() { when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); final SendOptions options = new SendOptions(); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions() + .retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(sendLink), producerOptions); // Act @@ -132,8 +135,7 @@ public void partitionProducerCannotSendWithPartitionKey() { final SendOptions options = new SendOptions().partitionKey("Some partition key"); final EventHubProducerOptions producerOptions = new EventHubProducerOptions() - .retry(Retry.getNoRetry()) - .timeout(Duration.ofSeconds(30)) + .retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))) .partitionId("my-partition-id"); final EventHubProducer producer = new EventHubProducer(Mono.just(sendLink), producerOptions); @@ -166,7 +168,7 @@ public void sendTooManyMessages() { }); final SendOptions options = new SendOptions(); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(link), producerOptions); // Act & Assert @@ -199,7 +201,7 @@ public void createsEventDataBatch() { // This event will be 1025 bytes when serialized. final EventData tooLargeEvent = new EventData(new byte[maxEventPayload + 1]); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(link), producerOptions); // Act & Assert @@ -237,7 +239,7 @@ public void createsEventDataBatchWithPartitionKey() { // This event is 1024 bytes when serialized. final EventData event = new EventData(new byte[eventPayload]); final BatchOptions options = new BatchOptions().partitionKey("some-key"); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(link), producerOptions); // Act & Assert @@ -263,7 +265,7 @@ public void createEventDataBatchWhenMaxSizeIsTooBig() { // This event is 1024 bytes when serialized. final BatchOptions options = new BatchOptions().maximumSizeInBytes(batchSize); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(link), producerOptions); // Act & Assert @@ -296,7 +298,7 @@ public void createsEventDataBatchWithSize() { final EventData tooLargeEvent = new EventData(new byte[maxEventPayload + 1]); final BatchOptions options = new BatchOptions().maximumSizeInBytes(batchSize); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(link), producerOptions); // Act & Assert @@ -325,7 +327,7 @@ public void batchOptionsIsCloned() { final String originalKey = "some-key"; final BatchOptions options = new BatchOptions().partitionKey(originalKey); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(link), producerOptions); // Act & Assert @@ -355,7 +357,7 @@ public void sendsAnEventDataBatch() { // This event will be 1025 bytes when serialized. final EventData tooLargeEvent = new EventData(new byte[maxEventPayload + 1]); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions().retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); final EventHubProducer producer = new EventHubProducer(Mono.just(link), producerOptions); // Act & Assert diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java index c3aefcb3c4eb..41efd7a6bbcb 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/InteropAmqpPropertiesTest.java @@ -4,7 +4,7 @@ package com.azure.messaging.eventhubs; import com.azure.core.amqp.MessageConstant; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.ApiTestBase; import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider; @@ -59,7 +59,8 @@ protected void beforeTest() { final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(getReactorProvider()); client = new EventHubAsyncClient(getConnectionOptions(), getReactorProvider(), handlerProvider); - final EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(PARTITION_ID).retry(Retry.getNoRetry()).timeout(Duration.ofSeconds(30)); + final EventHubProducerOptions producerOptions = new EventHubProducerOptions().partitionId(PARTITION_ID) + .retry(new RetryOptions().tryTimeout(Duration.ofSeconds(30))); producer = client.createProducer(producerOptions); consumer = client.createConsumer(EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME, PARTITION_ID, EventPosition.latest()); } diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java index 80abf78fe0b7..4af9f36ea310 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/SetPrefetchCountTest.java @@ -3,7 +3,6 @@ package com.azure.messaging.eventhubs; -import com.azure.core.amqp.Retry; import com.azure.core.util.logging.ClientLogger; import com.azure.messaging.eventhubs.implementation.ApiTestBase; import com.azure.messaging.eventhubs.implementation.ReactorHandlerProvider; @@ -86,7 +85,7 @@ public void setLargePrefetchCount() throws InterruptedException { final int eventCount = NUMBER_OF_EVENTS; final CountDownLatch countDownLatch = new CountDownLatch(eventCount); final EventHubConsumerOptions options = new EventHubConsumerOptions() - .retry(Retry.getDefaultRetry()) + .retry(RETRY_OPTIONS) .prefetchCount(2000); consumer = client.createConsumer(EventHubAsyncClient.DEFAULT_CONSUMER_GROUP_NAME, PARTITION_ID, @@ -153,7 +152,7 @@ private void setupEventTestData(EventHubAsyncClient client) { try { MESSAGES_PUSHED_INSTANT.set(Instant.now()); - producer.send(events).block(TIMEOUT); + producer.send(events).block(RETRY_OPTIONS.tryTimeout()); } finally { dispose(producer); } diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ApiTestBase.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ApiTestBase.java index d30d403d6413..155e7a73bcf0 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ApiTestBase.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ApiTestBase.java @@ -3,7 +3,7 @@ package com.azure.messaging.eventhubs.implementation; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import com.azure.core.amqp.TransportType; import com.azure.core.credentials.TokenCredential; import com.azure.core.implementation.util.ImplUtils; @@ -36,11 +36,12 @@ */ public abstract class ApiTestBase extends TestBase { protected static final Duration TIMEOUT = Duration.ofSeconds(30); + protected static final RetryOptions RETRY_OPTIONS = new RetryOptions().tryTimeout(TIMEOUT); protected final ClientLogger logger; private static final String EVENT_HUB_CONNECTION_STRING_ENV_NAME = "AZURE_EVENTHUBS_CONNECTION_STRING"; private static final String CONNECTION_STRING = System.getenv(EVENT_HUB_CONNECTION_STRING_ENV_NAME); - private static final String TEST_CONNECTION_STRING = "Endpoint=sb://test-event-hub.servicebus.windows.net/;SharedAccessKeyName=myaccount;SharedAccessKey=ctzMq410TV3wS7upTBcunJTDLEJwMAZuFPfr0mrrA08=;EntityPath=eventhub1;"; + private static final String TEST_CONNECTION_STRING = "Endpoint=sb://test-event-hub.servicebus.windows.net/;SharedAccessKeyName=dummyaccount;SharedAccessKey=ctzMq410TV3wS7upTBcunJTDLEJwMAZuFPfr0mrrA08=;EntityPath=non-existent-hub;"; private ConnectionStringProperties properties; private Reactor reactor = mock(Reactor.class); @@ -60,7 +61,7 @@ protected ApiTestBase(ClientLogger logger) { public void setupTest() { logger.info("[{}]: Performing test set-up.", testName()); - final Scheduler scheduler = Schedulers.newElastic("AMQPConnection"); + final Scheduler scheduler = Schedulers.newParallel("AMQPConnection"); final String connectionString = getTestMode() == TestMode.RECORD ? CONNECTION_STRING : TEST_CONNECTION_STRING; @@ -87,8 +88,8 @@ public void setupTest() { } connectionOptions = new ConnectionOptions(properties.endpoint().getHost(), properties.eventHubPath(), - tokenCredential, getAuthorizationType(), TIMEOUT, transportType, Retry.getNoRetry(), - ProxyConfiguration.SYSTEM_DEFAULTS, scheduler); + tokenCredential, getAuthorizationType(), transportType, RETRY_OPTIONS, ProxyConfiguration.SYSTEM_DEFAULTS, + scheduler); beforeTest(); } diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java index 35e5c44fe387..2e6e9fad6582 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/CBSChannelTest.java @@ -5,6 +5,7 @@ import com.azure.core.amqp.AmqpConnection; import com.azure.core.amqp.CBSNode; +import com.azure.core.amqp.RetryOptions; import com.azure.core.amqp.exception.AmqpException; import com.azure.core.amqp.exception.ErrorCondition; import com.azure.core.credentials.TokenCredential; @@ -59,7 +60,7 @@ protected void beforeTest() { handlerProvider, mapper); cbsChannel = new CBSChannel(connection, getTokenCredential(), getAuthorizationType(), getReactorProvider(), - handlerProvider, Duration.ofMinutes(5)); + handlerProvider, new RetryOptions().tryTimeout(Duration.ofMinutes(5))); } @Override @@ -106,7 +107,7 @@ public void unsuccessfulAuthorize() { } final CBSNode node = new CBSChannel(connection, tokenProvider, getAuthorizationType(), getReactorProvider(), - handlerProvider, Duration.ofMinutes(5)); + handlerProvider, new RetryOptions().tryTimeout(Duration.ofMinutes(5))); // Act & Assert StepVerifier.create(node.authorize(tokenAudience)) diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorConnectionTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorConnectionTest.java index e631a76f3fb9..0c1d54e7b888 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorConnectionTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/ReactorConnectionTest.java @@ -5,12 +5,13 @@ import com.azure.core.amqp.AmqpConnection; import com.azure.core.amqp.AmqpEndpointState; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryMode; +import com.azure.core.amqp.RetryOptions; import com.azure.core.amqp.TransportType; import com.azure.core.credentials.TokenCredential; -import com.azure.messaging.eventhubs.models.ProxyConfiguration; import com.azure.messaging.eventhubs.implementation.handler.ConnectionHandler; import com.azure.messaging.eventhubs.implementation.handler.SessionHandler; +import com.azure.messaging.eventhubs.models.ProxyConfiguration; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Connection; @@ -87,9 +88,10 @@ public void setup() throws IOException { sessionHandler = new SessionHandler(CONNECTION_ID, HOSTNAME, SESSION_NAME, reactorDispatcher, TEST_DURATION); reactorHandlerProvider = new MockReactorHandlerProvider(reactorProvider, connectionHandler, sessionHandler, null, null); + final RetryOptions retryOptions = new RetryOptions().tryTimeout(TEST_DURATION); final ConnectionOptions connectionOptions = new ConnectionOptions(CREDENTIAL_INFO.endpoint().getHost(), - CREDENTIAL_INFO.eventHubPath(), tokenProvider, CBSAuthorizationType.SHARED_ACCESS_SIGNATURE, TEST_DURATION, - TransportType.AMQP, Retry.getDefaultRetry(), ProxyConfiguration.SYSTEM_DEFAULTS, SCHEDULER); + CREDENTIAL_INFO.eventHubPath(), tokenProvider, CBSAuthorizationType.SHARED_ACCESS_SIGNATURE, + TransportType.AMQP, retryOptions, ProxyConfiguration.SYSTEM_DEFAULTS, SCHEDULER); connection = new ReactorConnection(CONNECTION_ID, connectionOptions, reactorProvider, reactorHandlerProvider, responseMapper); } @@ -281,10 +283,15 @@ public void createCBSNode() { @Test public void createCBSNodeTimeoutException() { // Arrange - Duration timeout = Duration.ofSeconds(5); + Duration timeout = Duration.ofSeconds(2); + RetryOptions retryOptions = new RetryOptions() + .maxRetries(2) + .delay(Duration.ofMillis(200)) + .retryMode(RetryMode.FIXED) + .tryTimeout(timeout); ConnectionOptions parameters = new ConnectionOptions(CREDENTIAL_INFO.endpoint().getHost(), - CREDENTIAL_INFO.eventHubPath(), tokenProvider, CBSAuthorizationType.SHARED_ACCESS_SIGNATURE, timeout, - TransportType.AMQP, Retry.getDefaultRetry(), ProxyConfiguration.SYSTEM_DEFAULTS, Schedulers.elastic()); + CREDENTIAL_INFO.eventHubPath(), tokenProvider, CBSAuthorizationType.SHARED_ACCESS_SIGNATURE, + TransportType.AMQP, retryOptions, ProxyConfiguration.SYSTEM_DEFAULTS, Schedulers.parallel()); // Act and Assert try (ReactorConnection connectionBad = new ReactorConnection(CONNECTION_ID, parameters, reactorProvider, reactorHandlerProvider, responseMapper)) { diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/BatchOptionsTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/BatchOptionsTest.java similarity index 93% rename from sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/BatchOptionsTest.java rename to sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/BatchOptionsTest.java index 9a1ab65b9e8a..43b3f7fb25b2 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/BatchOptionsTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/BatchOptionsTest.java @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.messaging.eventhubs; +package com.azure.messaging.eventhubs.models; import org.junit.Assert; import org.junit.Test; @@ -41,7 +41,7 @@ public void cloneIdentical() { BatchOptions options = new BatchOptions().partitionKey(partitionKey).maximumSizeInBytes(size); // Act - BatchOptions clone = (BatchOptions) options.clone(); + BatchOptions clone = options.clone(); // Assert Assert.assertNotSame(clone, options); @@ -66,7 +66,7 @@ public void cloneModifyContents() { int size = 24; BatchOptions options = new BatchOptions().partitionKey(originalPartitionKey).maximumSizeInBytes(originalSize); - BatchOptions clone = (BatchOptions) options.clone(); + BatchOptions clone = options.clone(); // Act clone.partitionKey(partitionKey) diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/EventHubProducerOptionsTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/EventHubProducerOptionsTest.java index eba44d5b987f..d91bd7bbc938 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/EventHubProducerOptionsTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/EventHubProducerOptionsTest.java @@ -3,8 +3,7 @@ package com.azure.messaging.eventhubs.models; -import com.azure.core.amqp.ExponentialRetry; -import com.azure.core.amqp.Retry; +import com.azure.core.amqp.RetryOptions; import org.junit.Assert; import org.junit.Test; @@ -16,22 +15,25 @@ public void cloneProperties() { // Arrange String partitionId = "my-partition-id"; Duration timeout = Duration.ofMinutes(10); - Retry retry = new ExponentialRetry(Duration.ofSeconds(20), Duration.ofSeconds(30), 3); + RetryOptions retryOptions = new RetryOptions() + .tryTimeout(timeout) + .delay(Duration.ofSeconds(20)) + .maxDelay(Duration.ofSeconds(30)) + .maxRetries(3); EventHubProducerOptions options = new EventHubProducerOptions(); options.partitionId(partitionId) - .timeout(timeout) - .retry(retry) + .retry(retryOptions) .partitionId(partitionId); // Act - EventHubProducerOptions clone = (EventHubProducerOptions) options.clone(); + EventHubProducerOptions clone = options.clone(); // Assert Assert.assertEquals(partitionId, clone.partitionId()); - Assert.assertEquals(timeout, clone.timeout()); - Assert.assertEquals(retry, clone.retry()); + Assert.assertEquals(timeout, clone.retry().tryTimeout()); + Assert.assertEquals(retryOptions, clone.retry()); - Assert.assertNotSame(retry, clone.retry()); + Assert.assertNotSame(retryOptions, clone.retry()); } } diff --git a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/SendOptionsTest.java b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/SendOptionsTest.java similarity index 90% rename from sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/SendOptionsTest.java rename to sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/SendOptionsTest.java index 199497e6bd64..807969dc58a4 100644 --- a/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/SendOptionsTest.java +++ b/sdk/eventhubs/azure-eventhubs/src/test/java/com/azure/messaging/eventhubs/models/SendOptionsTest.java @@ -1,9 +1,8 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the MIT License. -package com.azure.messaging.eventhubs; +package com.azure.messaging.eventhubs.models; -import com.azure.messaging.eventhubs.models.SendOptions; import org.junit.Assert; import org.junit.Test; @@ -47,7 +46,7 @@ public void cloneIdentical() { SendOptions options = new SendOptions().partitionKey(partitionKey); // Act - SendOptions clone = (SendOptions) options.clone(); + SendOptions clone = options.clone(); // Assert Assert.assertNotSame(clone, options); @@ -67,7 +66,7 @@ public void cloneModifyContents() { String partitionKey = "A new partition key"; SendOptions options = new SendOptions().partitionKey(originalPartitionKey); - SendOptions clone = (SendOptions) options.clone(); + SendOptions clone = options.clone(); // Act clone.partitionKey(partitionKey);