diff --git a/sdk/core/core-amqp/src/retry.ts b/sdk/core/core-amqp/src/retry.ts index adec1cb2bd34..1662245ed7d3 100644 --- a/sdk/core/core-amqp/src/retry.ts +++ b/sdk/core/core-amqp/src/retry.ts @@ -169,6 +169,9 @@ export async function retry(config: RetryConfig): Promise { if (config.minExponentialRetryDelayInMs == undefined || config.minExponentialRetryDelayInMs < 0) { config.minExponentialRetryDelayInMs = defaultMinDelayForExponentialRetryInMs; } + if (config.retryPolicy == undefined) { + config.retryPolicy = RetryPolicy.LinearRetryPolicy; + } let lastError: MessagingError | undefined; let result: any; let success = false; diff --git a/sdk/eventhub/event-hubs/review/event-hubs.api.md b/sdk/eventhub/event-hubs/review/event-hubs.api.md index 2df4c943a29e..ce8edfba9428 100644 --- a/sdk/eventhub/event-hubs/review/event-hubs.api.md +++ b/sdk/eventhub/event-hubs/review/event-hubs.api.md @@ -15,6 +15,7 @@ import { EventHubConnectionConfig } from '@azure/core-amqp'; import { MessagingError } from '@azure/core-amqp'; import { Receiver } from 'rhea-promise'; import { ReceiverOptions } from 'rhea-promise'; +import { RetryPolicy } from '@azure/core-amqp'; import { Sender } from 'rhea-promise'; import { SharedKeyCredential } from '@azure/core-amqp'; import { TokenCredential } from '@azure/core-amqp'; @@ -190,8 +191,11 @@ export class ReceiveHandler { // @public export interface RetryOptions { + maxExponentialRetryDelayInMs?: number; maxRetries?: number; + minExponentialRetryDelayInMs?: number; retryInterval?: number; + retryPolicy?: RetryPolicy; timeoutInMs?: number; } diff --git a/sdk/eventhub/event-hubs/src/eventHubClient.ts b/sdk/eventhub/event-hubs/src/eventHubClient.ts index 8eef02f7087b..36003e442ae1 100644 --- a/sdk/eventhub/event-hubs/src/eventHubClient.ts +++ b/sdk/eventhub/event-hubs/src/eventHubClient.ts @@ -10,7 +10,8 @@ import { SharedKeyCredential, ConnectionConfig, isTokenCredential, - Constants + Constants, + RetryPolicy } from "@azure/core-amqp"; import { ConnectionContext } from "./connectionContext"; @@ -40,16 +41,20 @@ export interface RetryOptions { * A minimum value of 60 seconds will be used if a value not greater than this is provided. */ timeoutInMs?: number; - // /** - // * The maximum value the `retryInterval` gets incremented exponentially between retries. - // * Not applicable, when `isExponential` is set to `false`. - // */ - // maxRetryInterval?: number; - // /** - // * Boolean denoting if the `retryInterval` should be incremented exponentially between - // * retries or kept the same. - // */ - // isExponential?: boolean; + /** + * @property {RetryPolicy} [retryPolicy] Denotes which retry policy to apply. If undefined, defaults to `LinearRetryPolicy` + */ + retryPolicy?: RetryPolicy; + /** + * @property {number} [maxExponentialRetryDelayInMs] Denotes the maximum delay between retries + * that the retry attempts will be capped at. Applicable only when performing exponential retry. + */ + maxExponentialRetryDelayInMs?: number; + /** + * @property {number} [minExponentialRetryDelayInMs] Denotes the minimum delay between retries + * to use. Applicable only when performing exponential retry. + */ + minExponentialRetryDelayInMs?: number; } export function getRetryAttemptTimeoutInMs(retryOptions: RetryOptions | undefined): number { diff --git a/sdk/eventhub/event-hubs/src/eventHubSender.ts b/sdk/eventhub/event-hubs/src/eventHubSender.ts index 5b24d7a54fa0..480fec5fe9c5 100644 --- a/sdk/eventhub/event-hubs/src/eventHubSender.ts +++ b/sdk/eventhub/event-hubs/src/eventHubSender.ts @@ -26,10 +26,14 @@ import { import { EventData, toAmqpMessage } from "./eventData"; import { ConnectionContext } from "./connectionContext"; import { LinkEntity } from "./linkEntity"; -import { SendOptions, EventHubProducerOptions } from "./eventHubClient"; +import { + SendOptions, + EventHubProducerOptions, + getRetryAttemptTimeoutInMs, + RetryOptions +} from "./eventHubClient"; import { AbortSignalLike, AbortError } from "@azure/abort-controller"; import { EventDataBatch } from "./eventDataBatch"; -import { getRetryAttemptTimeoutInMs, RetryOptions } from "./eventHubClient"; /** * @ignore @@ -347,12 +351,14 @@ export class EventHubSender extends LinkEntity { * @returns Promise * @throws {AbortError} Thrown if the operation is cancelled via the abortSignal. */ - async getMaxMessageSize(options?: { - retryOptions?: RetryOptions; - abortSignal?: AbortSignalLike; - }): Promise { - const abortSignal = options && options.abortSignal; - const retryOptions = options && options.retryOptions; + async getMaxMessageSize( + options: { + retryOptions?: RetryOptions; + abortSignal?: AbortSignalLike; + } = {} + ): Promise { + const abortSignal = options.abortSignal; + const retryOptions = options.retryOptions || {}; if (this.isOpen()) { return this._sender!.maxMessageSize; } @@ -386,23 +392,18 @@ export class EventHubSender extends LinkEntity { this.senderLock ); await defaultLock.acquire(this.senderLock, () => { - const maxRetries = - retryOptions && typeof retryOptions.maxRetries === "number" - ? retryOptions.maxRetries - : Constants.defaultMaxRetries; - const retryInterval = - retryOptions && - typeof retryOptions.retryInterval === "number" && - retryOptions.retryInterval > 0 - ? retryOptions.retryInterval / 1000 - : Constants.defaultDelayBetweenOperationRetriesInSeconds; - const config: RetryConfig = { operation: () => this._init(), connectionId: this._context.connectionId, operationType: RetryOperationType.senderLink, - maxRetries: maxRetries, - delayInSeconds: retryInterval + maxRetries: retryOptions.maxRetries, + delayInSeconds: + typeof retryOptions.retryInterval === "number" + ? retryOptions.retryInterval / 1000 + : undefined, + retryPolicy: retryOptions.retryPolicy, + minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs, + maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs }; return retry(config); @@ -555,6 +556,7 @@ export class EventHubSender extends LinkEntity { options: SendOptions & EventHubProducerOptions = {} ): Promise { const abortSignal: AbortSignalLike | undefined = options.abortSignal; + const retryOptions = options.retryOptions || {}; const sendEventPromise = () => new Promise(async (resolve, reject) => { let waitTimer: any; @@ -567,9 +569,8 @@ export class EventHubSender extends LinkEntity { const rejectOnAbort = () => { const desc: string = - `[${this._context.connectionId}] The send operation on the Sender "${ - this.name - }" with ` + `address "${this.address}" has been cancelled by the user.`; + `[${this._context.connectionId}] The send operation on the Sender "${this.name}" with ` + + `address "${this.address}" has been cancelled by the user.`; log.error(desc); return reject(new AbortError("The send operation has been cancelled by the user.")); }; @@ -742,19 +743,18 @@ export class EventHubSender extends LinkEntity { } }); - const maxRetries = options.retryOptions && options.retryOptions.maxRetries; - const delayInSeconds = - options.retryOptions && - options.retryOptions.retryInterval && - options.retryOptions.retryInterval >= 0 - ? options.retryOptions.retryInterval / 1000 - : Constants.defaultDelayBetweenOperationRetriesInSeconds; const config: RetryConfig = { operation: sendEventPromise, connectionId: this._context.connectionId, operationType: RetryOperationType.sendMessage, - maxRetries: maxRetries, - delayInSeconds: delayInSeconds + maxRetries: retryOptions.maxRetries, + delayInSeconds: + typeof retryOptions.retryInterval === "number" + ? retryOptions.retryInterval / 1000 + : undefined, + retryPolicy: retryOptions.retryPolicy, + minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs, + maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs }; return retry(config); } diff --git a/sdk/eventhub/event-hubs/src/managementClient.ts b/sdk/eventhub/event-hubs/src/managementClient.ts index 8b85ab83bfc0..6bdd584ce5e4 100644 --- a/sdk/eventhub/event-hubs/src/managementClient.ts +++ b/sdk/eventhub/event-hubs/src/managementClient.ts @@ -313,6 +313,7 @@ export class ManagementClient extends LinkEntity { requestName?: string; } = {} ): Promise { + const retryOptions = options.retryOptions || {}; try { const aborter: AbortSignalLike | undefined = options && options.abortSignal; @@ -414,19 +415,18 @@ export class ManagementClient extends LinkEntity { } }); - const maxRetries = options.retryOptions && options.retryOptions.maxRetries; - const delayInSeconds = - options.retryOptions && - options.retryOptions.retryInterval && - options.retryOptions.retryInterval >= 0 - ? options.retryOptions.retryInterval / 1000 - : Constants.defaultDelayBetweenOperationRetriesInSeconds; const config: RetryConfig = { operation: sendOperationPromise, connectionId: this._context.connectionId, operationType: RetryOperationType.management, - maxRetries: maxRetries, - delayInSeconds: delayInSeconds + maxRetries: retryOptions.maxRetries, + delayInSeconds: + typeof retryOptions.retryInterval === "number" + ? retryOptions.retryInterval / 1000 + : undefined, + retryPolicy: retryOptions.retryPolicy, + minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs, + maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs }; return (await retry(config)).body; } catch (err) { diff --git a/sdk/eventhub/event-hubs/src/receiver.ts b/sdk/eventhub/event-hubs/src/receiver.ts index 89ee9e6bbadb..c069e4115eff 100644 --- a/sdk/eventhub/event-hubs/src/receiver.ts +++ b/sdk/eventhub/event-hubs/src/receiver.ts @@ -73,7 +73,7 @@ export class EventHubConsumer { /** * @property The set of retry options to configure the receiveBatch operation. */ - private _retryOptions: Required>; + private _retryOptions: RetryOptions; /** * @property Returns `true` if the consumer is closed. This can happen either because the consumer @@ -112,7 +112,7 @@ export class EventHubConsumer { * @readonly */ get ownerLevel(): number | undefined { - return this._receiverOptions && this._receiverOptions.ownerLevel; + return this._receiverOptions.ownerLevel; } /** @@ -139,7 +139,7 @@ export class EventHubConsumer { this._consumerGroup = consumerGroup; this._partitionId = partitionId; this._receiverOptions = options || {}; - this._retryOptions = this._initRetryOptions(this._receiverOptions.retryOptions); + this._retryOptions = this._receiverOptions.retryOptions || {}; this._baseConsumer = new EventHubReceiver( context, consumerGroup, @@ -370,7 +370,7 @@ export class EventHubConsumer { ); const addTimeout = (): void => { - let msg = "[%s] Setting the wait timer for %d seconds for receiver '%s'."; + const msg = "[%s] Setting the wait timer for %d seconds for receiver '%s'."; log.batching( msg, this._context.connectionId, @@ -404,10 +404,16 @@ export class EventHubConsumer { const config: RetryConfig = { connectionHost: this._context.config.host, connectionId: this._context.connectionId, - delayInSeconds: retryOptions.retryInterval, + delayInSeconds: + typeof retryOptions.retryInterval === "number" && retryOptions.retryInterval > 0 + ? retryOptions.retryInterval / 1000 + : Constants.defaultDelayBetweenOperationRetriesInSeconds, operation: retrieveEvents, operationType: RetryOperationType.receiveMessage, - maxRetries: retryOptions.maxRetries + maxRetries: retryOptions.maxRetries, + retryPolicy: retryOptions.retryPolicy, + minExponentialRetryDelayInMs: retryOptions.minExponentialRetryDelayInMs, + maxExponentialRetryDelayInMs: retryOptions.maxExponentialRetryDelayInMs }; return retry(config); } @@ -436,24 +442,6 @@ export class EventHubConsumer { } } - private _initRetryOptions( - retryOptions: RetryOptions = {} - ): Required> { - const maxRetries = - typeof retryOptions.maxRetries === "number" - ? retryOptions.maxRetries - : Constants.defaultMaxRetries; - const retryInterval = - typeof retryOptions.retryInterval === "number" && retryOptions.retryInterval > 0 - ? retryOptions.retryInterval / 1000 - : Constants.defaultDelayBetweenOperationRetriesInSeconds; - - return { - maxRetries, - retryInterval - }; - } - private _throwIfAlreadyReceiving(): void { if (this.isReceivingMessages) { const errorMessage = `The EventHubConsumer for "${this._context.config.entityPath}" is already receiving messages.`;